Repository: kafka Updated Branches: refs/heads/trunk c4e59a338 -> 41c0f8add
KAFKA-5049; Chroot check should be done for each ZkUtils instance Author: anukin <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2857 from anukin/KAFKA_5049_zkroot_check Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41c0f8ad Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41c0f8ad Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41c0f8ad Branch: refs/heads/trunk Commit: 41c0f8addeef44be23524bade61bcb2ab6077706 Parents: c4e59a3 Author: anukin <[email protected]> Authored: Wed Apr 19 11:25:30 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 19 11:30:55 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/utils/ZkUtils.scala | 46 ++++++++++---------- .../test/scala/unit/kafka/zk/ZKPathTest.scala | 18 ++++---- 2 files changed, 33 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/41c0f8ad/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 724414e..4e2b11a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -65,8 +65,6 @@ object ZkUtils { val ConfigChangesPath = s"$ConfigPath/changes" val ConfigUsersPath = s"$ConfigPath/users" val PidBlockPath = "/latest_pid_block" - - // Important: it is necessary to add any new top level Zookeeper path to the Seq val SecureZkRootPaths = Seq(AdminPath, BrokersPath, @@ -243,6 +241,9 @@ class ZkUtils(val zkClient: ZkClient, IsrChangeNotificationPath, PidBlockPath) + // Visible for testing + val zkPath = new ZkPath(zkClient) + import ZkUtils._ @deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1") @@ -451,7 +452,7 @@ class ZkUtils(val zkClient: ZkClient, } if (!zkClient.exists(path)) - ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException + zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException } /** @@ -461,7 +462,7 @@ class ZkUtils(val zkClient: ZkClient, val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls val parentDir = path.substring(0, path.lastIndexOf('/')) if (parentDir.length != 0) { - ZkPath.createPersistent(zkClient, parentDir, createParents = true, acl) + zkPath.createPersistent(parentDir, createParents = true, acl) } } @@ -471,11 +472,11 @@ class ZkUtils(val zkClient: ZkClient, private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { - ZkPath.createEphemeral(zkClient, path, data, acl) + zkPath.createEphemeral(path, data, acl) } catch { case _: ZkNoNodeException => createParentPath(path) - ZkPath.createEphemeral(zkClient, path, data, acl) + zkPath.createEphemeral(path, data, acl) } } @@ -512,17 +513,17 @@ class ZkUtils(val zkClient: ZkClient, def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = { val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { - ZkPath.createPersistent(zkClient, path, data, acl) + zkPath.createPersistent(path, data, acl) } catch { case _: ZkNoNodeException => createParentPath(path) - ZkPath.createPersistent(zkClient, path, data, acl) + zkPath.createPersistent(path, data, acl) } } def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = { val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - ZkPath.createPersistentSequential(zkClient, path, data, acl) + zkPath.createPersistentSequential(path, data, acl) } /** @@ -538,7 +539,7 @@ class ZkUtils(val zkClient: ZkClient, case _: ZkNoNodeException => createParentPath(path) try { - ZkPath.createPersistent(zkClient, path, data, acl) + zkPath.createPersistent(path, data, acl) } catch { case _: ZkNodeExistsException => zkClient.writeData(path, data) @@ -608,7 +609,7 @@ class ZkUtils(val zkClient: ZkClient, } catch { case _: ZkNoNodeException => createParentPath(path) - ZkPath.createEphemeral(zkClient, path, data, acl) + zkPath.createEphemeral(path, data, acl) } } @@ -993,11 +994,12 @@ class ZKConfig(props: VerifiableProperties) { val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000) } -object ZkPath { +class ZkPath(client: ZkClient) { + @volatile private var isNamespacePresent: Boolean = false - def checkNamespace(client: ZkClient) { - if(isNamespacePresent) + def checkNamespace() { + if (isNamespacePresent) return if (!client.exists("/")) { @@ -1010,23 +1012,23 @@ object ZkPath { isNamespacePresent = false } - def createPersistent(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) { - checkNamespace(client) + def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) { + checkNamespace() client.createPersistent(path, data, acls) } - def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: java.util.List[ACL]) { - checkNamespace(client) + def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) { + checkNamespace() client.createPersistent(path, createParents, acls) } - def createEphemeral(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) { - checkNamespace(client) + def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) { + checkNamespace() client.createEphemeral(path, data, acls) } - def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]): String = { - checkNamespace(client) + def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = { + checkNamespace() client.createPersistentSequential(path, data, acls) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/41c0f8ad/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index d8f0de4..04f8aaf 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -18,7 +18,7 @@ package kafka.zk import kafka.consumer.ConsumerConfig -import kafka.utils.{ZkPath, TestUtils, ZkUtils} +import kafka.utils.{TestUtils, ZkUtils} import org.apache.kafka.common.config.ConfigException import org.junit.Assert._ import org.junit.Test @@ -36,7 +36,7 @@ class ZKPathTest extends ZooKeeperTestHarness { val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createPersistentPath(path) fail("Failed to throw ConfigException for missing zookeeper root node") } catch { @@ -49,7 +49,7 @@ class ZKPathTest extends ZooKeeperTestHarness { def testCreatePersistentPath { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createPersistentPath(path) assertTrue("Failed to create persistent path", zkUtils.pathExists(path)) zkUtils.close() @@ -60,7 +60,7 @@ class ZKPathTest extends ZooKeeperTestHarness { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.makeSurePersistentPathExists(path) fail("Failed to throw ConfigException for missing zookeeper root node") } catch { @@ -73,7 +73,7 @@ class ZKPathTest extends ZooKeeperTestHarness { def testMakeSurePersistsPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.makeSurePersistentPathExists(path) assertTrue("Failed to create persistent path", zkUtils.pathExists(path)) zkUtils.close() @@ -84,7 +84,7 @@ class ZKPathTest extends ZooKeeperTestHarness { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createEphemeralPathExpectConflict(path, "somedata") fail("Failed to throw ConfigException for missing zookeeper root node") } catch { @@ -97,7 +97,7 @@ class ZKPathTest extends ZooKeeperTestHarness { def testCreateEphemeralPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createEphemeralPathExpectConflict(path, "somedata") assertTrue("Failed to create ephemeral path", zkUtils.pathExists(path)) zkUtils.close() @@ -109,7 +109,7 @@ class ZKPathTest extends ZooKeeperTestHarness { "test", "1")) val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createSequentialPersistentPath(path) fail("Failed to throw ConfigException for missing zookeeper root node") } catch { @@ -122,7 +122,7 @@ class ZKPathTest extends ZooKeeperTestHarness { def testCreatePersistentSequentialExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - ZkPath.resetNamespaceCheckedState + zkUtils.zkPath.resetNamespaceCheckedState val actualPath = zkUtils.createSequentialPersistentPath(path) assertTrue("Failed to create persistent path", zkUtils.pathExists(actualPath)) zkUtils.close()
