[ https://issues.apache.org/jira/browse/KAFKA-7259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704294#comment-16704294 ]
ASF GitHub Bot commented on KAFKA-7259: --------------------------------------- omkreddy closed pull request #5480: KAFKA-7259: Remove deprecated ZKUtils usage from ZkSecurityMigrator URL: https://github.com/apache/kafka/pull/5480 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index a833db4dfd0..5cab801bf3f 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -17,9 +17,11 @@ package kafka.admin -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging, ZkUtils} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging} +import kafka.zk.{KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.I0Itec.zkclient.exception.ZkException import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -92,8 +94,9 @@ object ZkSecurityMigrator extends Logging { val zkUrl = opts.options.valueOf(opts.zkUrlOpt) val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue - val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl) - val migrator = new ZkSecurityMigrator(zkUtils) + val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout, + Int.MaxValue, Time.SYSTEM) + val migrator = new ZkSecurityMigrator(zkClient) migrator.run() } @@ -120,20 +123,21 @@ object ZkSecurityMigrator extends Logging { } } -class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { +class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { + private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient) private val futures = new Queue[Future[String]] - private def setAcl(path: String, setPromise: Promise[String]) = { + private def setAcl(path: String, setPromise: Promise[String]): Unit = { info("Setting ACL for path %s".format(path)) - zkUtils.zkConnection.getZookeeper.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, setPromise) + zkSecurityMigratorUtils.currentZooKeeper.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise) } - private def getChildren(path: String, childrenPromise: Promise[String]) = { + private def getChildren(path: String, childrenPromise: Promise[String]): Unit = { info("Getting children to set ACLs for path %s".format(path)) - zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) + zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) } - private def setAclIndividually(path: String) = { + private def setAclIndividually(path: String): Unit = { val setPromise = Promise[String] futures.synchronized { futures += setPromise.future @@ -141,7 +145,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { setAcl(path, setPromise) } - private def setAclsRecursively(path: String) = { + private def setAclsRecursively(path: String): Unit = { val setPromise = Promise[String] val childrenPromise = Promise[String] futures.synchronized { @@ -157,7 +161,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { path: String, ctx: Object, children: java.util.List[String]) { - val zkHandle = zkUtils.zkConnection.getZookeeper + val zkHandle = zkSecurityMigratorUtils.currentZooKeeper val promise = ctx.asInstanceOf[Promise[String]] Code.get(rc) match { case Code.OK => @@ -191,7 +195,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { path: String, ctx: Object, stat: Stat) { - val zkHandle = zkUtils.zkConnection.getZookeeper + val zkHandle = zkSecurityMigratorUtils.currentZooKeeper val promise = ctx.asInstanceOf[Promise[String]] Code.get(rc) match { @@ -199,7 +203,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { info("Successfully set ACLs for %s".format(path)) promise success "done" case Code.CONNECTIONLOSS => - zkHandle.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, ctx) + zkHandle.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, ctx) case Code.NONODE => warn("Znode is gone, it could be have been legitimately deleted: %s".format(path)) promise success "done" @@ -218,9 +222,9 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { private def run(): Unit = { try { setAclIndividually("/") - for (path <- ZkUtils.SecureZkRootPaths) { + for (path <- ZkData.SecureRootPaths) { debug("Going to set ACL for %s".format(path)) - zkUtils.makeSurePersistentPathExists(path) + zkClient.makeSurePersistentPathExists(path) setAclsRecursively(path) } @@ -240,7 +244,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { recurse() } finally { - zkUtils.close + zkClient.close } } } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 4c8c4e1c28b..281e920fb32 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -20,7 +20,6 @@ package kafka.tools import java.io._ import java.nio.ByteBuffer -import joptsimple.OptionParser import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, OffsetKey} import kafka.coordinator.transaction.TransactionLog import kafka.log._ diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 4ad40ef6dad..732a8277d83 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -81,8 +81,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param data the znode data * @return the created path (including the appended monotonically increasing number) */ - private[zk] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = { - val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + private[kafka] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = { + val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow createResponse.name @@ -137,7 +137,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo try { val transaction = zooKeeperClient.createTransaction() transaction.create(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), - acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL) + defaultAcls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL) transaction.setData(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion) val results = transaction.commit() val setDataResult = results.get(1).asInstanceOf[SetDataResult] @@ -214,7 +214,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val path = TopicPartitionStateZNode.path(partition) val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) - CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion)) + CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(createRequests.toSeq) } @@ -237,7 +237,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo */ def createControllerEpochRaw(epoch: Int): CreateResponse = { val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), - acls(ControllerEpochZNode.path), CreateMode.PERSISTENT) + defaultAcls(ControllerEpochZNode.path), CreateMode.PERSISTENT) retryRequestUntilConnected(createRequest) } @@ -384,7 +384,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo def createConfigChangeNotification(sanitizedEntityPath: String): Unit = { makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) val path = ConfigEntityChangeNotificationSequenceZNode.createPath - val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow() } @@ -803,7 +803,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } def create(reassignmentData: Array[Byte]): CreateResponse = { - val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, acls(ReassignPartitionsZNode.path), + val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, defaultAcls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(createRequest) } @@ -832,9 +832,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = { - val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion, - zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) - retryRequestUntilConnected(deleteRequest) + deletePath(ReassignPartitionsZNode.path, expectedControllerEpochZkVersion) } /** @@ -1114,7 +1112,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo def createAclsForResourceIfNotExists(resource: Resource, aclsSet: Set[Acl]): (Boolean, Int) = { def create(aclData: Array[Byte]): CreateResponse = { val path = ResourceZNode.path(resource) - val createRequest = CreateRequest(path, aclData, acls(path), CreateMode.PERSISTENT) + val createRequest = CreateRequest(path, aclData, defaultAcls(path), CreateMode.PERSISTENT) retryRequestUntilConnected(createRequest) } @@ -1134,7 +1132,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo */ def createAclChangeNotification(resource: Resource): Unit = { val aclChange = ZkAclStore(resource.patternType).changeStore.createChangeNode(resource) - val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) + val createRequest = CreateRequest(aclChange.path, aclChange.bytes, defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.maybeThrow } @@ -1241,11 +1239,21 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo /** * Deletes the zk node recursively - * @param path - * @return return true if it succeeds, false otherwise + * @param path path to delete + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. + * @param recursiveDelete enable recursive delete + * @return KeeperException if there is an error while deleting the path */ - def deletePath(path: String): Boolean = { - deleteRecursive(path) + def deletePath(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion, recursiveDelete: Boolean = true): Unit = { + if (recursiveDelete) + deleteRecursive(path, expectedControllerEpochZkVersion) + else { + val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) + val deleteResponse = retryRequestUntilConnected(deleteRequest) + if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) { + throw deleteResponse.resultException.get + } + } } /** @@ -1262,7 +1270,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo */ def createTokenChangeNotification(tokenId: String): Unit = { val path = DelegationTokenChangeNotificationSequenceZNode.createPath - val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) createResponse.resultException.foreach(e => throw e) } @@ -1283,7 +1291,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo def create(tokenData: Array[Byte]): CreateResponse = { val path = DelegationTokenInfoZNode.path(token.tokenInfo().tokenId()) - val createRequest = CreateRequest(path, tokenData, acls(path), CreateMode.PERSISTENT) + val createRequest = CreateRequest(path, tokenData, defaultAcls(path), CreateMode.PERSISTENT) retryRequestUntilConnected(createRequest) } @@ -1439,6 +1447,31 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } + /** + * Return the ACLs of the node of the given path + * @param path the given path for the node + * @return the ACL array of the given node. + */ + def getAcl(path: String): Seq[ACL] = { + val getAclRequest = GetAclRequest(path) + val getAclResponse = retryRequestUntilConnected(getAclRequest) + getAclResponse.resultCode match { + case Code.OK => getAclResponse.acl + case _ => throw getAclResponse.resultException.get + } + } + + /** + * sets the ACLs to the node of the given path + * @param path the given path for the node + * @param acl the given acl for the node + */ + def setAcl(path: String, acl: Seq[ACL]): Unit = { + val setAclRequest = SetAclRequest(path, acl, ZkVersion.MatchAnyVersion) + val setAclResponse = retryRequestUntilConnected(setAclRequest) + setAclResponse.maybeThrow + } + /** * Create the cluster Id. If the cluster id already exists, return the current cluster id. * @return cluster id @@ -1529,7 +1562,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } - private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = { + private[kafka] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = { def parentPath(path: String): String = { val indexOfLastSlash = path.lastIndexOf("/") @@ -1538,7 +1571,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } def createRecursive0(path: String): Unit = { - val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT) + val createRequest = CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT) var createResponse = retryRequestUntilConnected(createRequest) if (createResponse.resultCode == Code.NONODE) { createRecursive0(parentPath(path)) @@ -1551,7 +1584,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } - val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT) + val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT) var createResponse = retryRequestUntilConnected(createRequest) if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) { @@ -1569,7 +1602,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo private def createTopicPartition(partitions: Seq[TopicPartition], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = { val createRequests = partitions.map { partition => val path = TopicPartitionZNode.path(partition) - CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion)) + CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(createRequests) } @@ -1577,7 +1610,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = { val createRequests = topics.map { topic => val path = TopicPartitionsZNode.path(topic) - CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion)) + CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(createRequests) } @@ -1589,7 +1622,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo retryRequestsUntilConnected(getDataRequests) } - private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path) + def defaultAcls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path) + + def secure: Boolean = isSecure private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { retryRequestsUntilConnected(Seq(request)).head @@ -1655,7 +1690,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging { def create(): Code = { - val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL) + val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.EPHEMERAL) val createResponse = retryRequestUntilConnected(createRequest) val createResultCode = createResponse.resultCode match { case code@ Code.OK => diff --git a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala new file mode 100644 index 00000000000..31a7ba29073 --- /dev/null +++ b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.zk + +import org.apache.zookeeper.ZooKeeper + +/** + * This class should only be used in ZkSecurityMigrator tool. + * This class will be removed after we migrate ZkSecurityMigrator away from ZK's asynchronous API. + * @param kafkaZkClient + */ +class ZkSecurityMigratorUtils(val kafkaZkClient: KafkaZkClient) { + + def currentZooKeeper: ZooKeeper = kafkaZkClient.currentZooKeeper + +} diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index 5789d1ae8ca..c15a5080bf2 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -16,9 +16,8 @@ import java.io.File import java.util.Locale import kafka.server.KafkaConfig -import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils} +import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} @@ -29,6 +28,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { private val kafkaServerJaasEntryName = s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") + // disable secure acls of zkClient in ZooKeeperTestHarness + override protected def zkAclsEnabled = Some(false) override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) @@ -36,7 +37,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { @Before override def setUp(): Unit = { - startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName)) super.setUp() } @@ -52,8 +53,6 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { */ @Test def testZkAclsDisabled() { - val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled)) - TestUtils.verifyUnsecureZkAcls(zkUtils) - CoreUtils.swallow(zkUtils.close(), this) + TestUtils.verifyUnsecureZkAcls(zkClient) } } diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index efb8c48c7c4..bbe0dd8356d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -22,12 +22,11 @@ import javax.security.auth.Subject import javax.security.auth.login.AppConfigurationEntry import kafka.server.KafkaConfig -import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import kafka.utils.{TestUtils} import kafka.utils.JaasTestUtils._ import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth._ import org.apache.kafka.common.security.plain.PlainAuthenticateCallback import org.junit.Test @@ -134,8 +133,6 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes */ @Test def testAcls() { - val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled)) - TestUtils.verifySecureZkAcls(zkUtils, 1) - CoreUtils.swallow(zkUtils.close(), this) + TestUtils.verifySecureZkAcls(zkClient, 1) } } diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala index 9c6ae0b4199..20c28e76a20 100644 --- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala +++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala @@ -19,16 +19,16 @@ package kafka import java.io.{File, PrintWriter} import java.nio.file.{Files, StandardOpenOption} - import javax.imageio.ImageIO + import kafka.admin.ReassignPartitionsCommand import kafka.admin.ReassignPartitionsCommand.Throttle -import org.apache.kafka.common.TopicPartition import kafka.server.{KafkaConfig, KafkaServer, QuotaType} import kafka.utils.TestUtils._ import kafka.utils.{Exit, Logging, TestUtils, ZkUtils} import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness} import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition import org.jfree.chart.plot.PlotOrientation import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart} import org.jfree.data.xy.{XYSeries, XYSeriesCollection} diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 5d2d873425f..5cfab905777 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -20,10 +20,9 @@ import org.junit.Assert._ import org.junit.Test import kafka.utils.Logging import kafka.utils.TestUtils -import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness} +import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode, ZooKeeperTestHarness} import kafka.server.ConfigType import kafka.admin.TopicCommand.TopicCommandOptions -import kafka.utils.ZkUtils.getDeleteTopicPath import org.apache.kafka.common.errors.TopicExistsException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.config.ConfigException @@ -80,7 +79,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT // delete the NormalTopic val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic)) - val deletePath = getDeleteTopicPath(normalTopic) + val deletePath = DeleteTopicsTopicZNode.path(normalTopic) assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deletePath)) TopicCommand.deleteTopic(zkClient, deleteOpts) assertTrue("Delete path for topic should exist after deletion.", zkClient.pathExists(deletePath)) @@ -93,7 +92,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME)) - val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME) + val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME) assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deleteOffsetTopicPath)) intercept[AdminOperationException] { TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts) diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 1cdbe4b2a0e..de5ae22cba6 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -17,23 +17,32 @@ package kafka.security.auth +import java.nio.charset.StandardCharsets + import kafka.admin.ZkSecurityMigrator -import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} -import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness} -import org.apache.kafka.common.KafkaException +import kafka.utils.{Logging, TestUtils} +import kafka.zk._ +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.data.{ACL, Stat} import org.junit.Assert._ import org.junit.{After, Before, Test} -import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} import javax.security.auth.login.Configuration +import kafka.api.ApiVersion +import kafka.cluster.{Broker, EndPoint} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Time + +import scala.collection.JavaConverters._ +import scala.collection.Seq + class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections) val authProvider = "zookeeper.authProvider.1" - var zkUtils: ZkUtils = null @Before override def setUp() { @@ -41,13 +50,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { Configuration.setConfiguration(null) System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") super.setUp() - zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled)) } @After override def tearDown() { - if (zkUtils != null) - CoreUtils.swallow(zkUtils.close(), this) super.tearDown() System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) System.clearProperty(authProvider) @@ -59,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * secure ACLs and authentication with ZooKeeper. */ @Test - def testIsZkSecurityEnabled() { + def testIsZkSecurityEnabled(): Unit = { assertTrue(JaasUtils.isZkSecurityEnabled()) Configuration.setConfiguration(null) System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) @@ -75,59 +81,76 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { } /** - * Exercises the code in ZkUtils. The goal is mainly - * to verify that the behavior of ZkUtils is correct + * Exercises the code in KafkaZkClient. The goal is mainly + * to verify that the behavior of KafkaZkClient is correct * when isSecure is set to true. */ @Test - def testZkUtils() { - assertTrue(zkUtils.isSecure) - for (path <- zkUtils.persistentZkPaths) { - zkUtils.makeSurePersistentPathExists(path) - if (ZkUtils.sensitivePath(path)) { - val aclList = zkUtils.zkConnection.getAcl(path).getKey + def testKafkaZkClient(): Unit = { + assertTrue(zkClient.secure) + for (path <- ZkData.PersistentZkPaths) { + zkClient.makeSurePersistentPathExists(path) + if (ZkData.sensitivePath(path)) { + val aclList = zkClient.getAcl(path) assertEquals(s"Unexpected acl list size for $path", 1, aclList.size) - for (acl <- aclList.asScala) + for (acl <- aclList) assertTrue(TestUtils.isAclSecure(acl, sensitive = true)) - } else if (!path.equals(ZkUtils.ConsumersPath)) { - val aclList = zkUtils.zkConnection.getAcl(path).getKey + } else if (!path.equals(ConsumerPathZNode.path)) { + val aclList = zkClient.getAcl(path) assertEquals(s"Unexpected acl list size for $path", 2, aclList.size) - for (acl <- aclList.asScala) + for (acl <- aclList) assertTrue(TestUtils.isAclSecure(acl, sensitive = false)) } } - // Test that can create: createEphemeralPathExpectConflict - zkUtils.createEphemeralPathExpectConflict("/a", "") - verify("/a") - // Test that can create: createPersistentPath - zkUtils.createPersistentPath("/b") - verify("/b") + + // Test that creates Ephemeral node + val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT) + zkClient.registerBroker(brokerInfo) + verify(brokerInfo.path) + + // Test that creates persistent nodes + val topic1 = "topic1" + val assignment = Map( + new TopicPartition(topic1, 0) -> Seq(0, 1), + new TopicPartition(topic1, 1) -> Seq(0, 1), + new TopicPartition(topic1, 2) -> Seq(1, 2, 3) + ) + + // create a topic assignment + zkClient.createTopicAssignment(topic1, assignment) + verify(TopicZNode.path(topic1)) + // Test that can create: createSequentialPersistentPath - val seqPath = zkUtils.createSequentialPersistentPath("/c", "") + val seqPath = zkClient.createSequentialPersistentPath("/c", "".getBytes(StandardCharsets.UTF_8)) verify(seqPath) - // Test that can update: updateEphemeralPath - zkUtils.updateEphemeralPath("/a", "updated") - val valueA: String = zkUtils.zkClient.readData("/a") - assertTrue(valueA.equals("updated")) - // Test that can update: updatePersistentPath - zkUtils.updatePersistentPath("/b", "updated") - val valueB: String = zkUtils.zkClient.readData("/b") - assertTrue(valueB.equals("updated")) - info("Leaving testZkUtils") + // Test that can update Ephemeral node + val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL) + zkClient.updateBrokerInfo(updatedBrokerInfo) + assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1)) + + // Test that can update persistent nodes + val updatedAssignment = assignment - new TopicPartition(topic1, 2) + zkClient.setTopicAssignment(topic1, updatedAssignment) + assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get) } + private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol, + rack: Option[String] = None): BrokerInfo = + BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol + (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10) + /** * Tests the migration tool when making an unsecure * cluster secure. */ @Test - def testZkMigration() { - val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) + def testZkMigration(): Unit = { + val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) try { - testMigration(zkConnect, unsecureZkUtils, zkUtils) + testMigration(zkConnect, unsecureZkClient, zkClient) } finally { - unsecureZkUtils.close() + unsecureZkClient.close() } } @@ -136,12 +159,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * cluster unsecure. */ @Test - def testZkAntiMigration() { - val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) + def testZkAntiMigration(): Unit = { + val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) try { - testMigration(zkConnect, zkUtils, unsecureZkUtils) + testMigration(zkConnect, zkClient, unsecureZkClient) } finally { - unsecureZkUtils.close() + unsecureZkClient.close() } } @@ -149,42 +172,42 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * Tests that the persistent paths cannot be deleted. */ @Test - def testDelete() { + def testDelete(): Unit = { info(s"zkConnect string: $zkConnect") ZkSecurityMigrator.run(Array("--zookeeper.acl=secure", s"--zookeeper.connect=$zkConnect")) deleteAllUnsecure() } /** - * Tests that znodes cannot be deleted when the + * Tests that znodes cannot be deleted when the * persistent paths have children. */ @Test - def testDeleteRecursive() { + def testDeleteRecursive(): Unit = { info(s"zkConnect string: $zkConnect") - for (path <- ZkUtils.SecureZkRootPaths) { + for (path <- ZkData.SecureRootPaths) { info(s"Creating $path") - zkUtils.makeSurePersistentPathExists(path) - zkUtils.createPersistentPath(s"$path/fpjwashere", "") + zkClient.makeSurePersistentPathExists(path) + zkClient.createRecursive(s"$path/fpjwashere", "".getBytes(StandardCharsets.UTF_8)) } - zkUtils.zkConnection.setAcl("/", zkUtils.defaultAcls("/"), -1) + zkClient.setAcl("/", zkClient.defaultAcls("/")) deleteAllUnsecure() } - + /** * Tests the migration tool when chroot is being used. */ @Test def testChroot(): Unit = { val zkUrl = zkConnect + "/kafka" - zkUtils.createPersistentPath("/kafka") - val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false) - val secureZkUtils = ZkUtils(zkUrl, 6000, 6000, true) + zkClient.createRecursive("/kafka") + val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) + val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, Time.SYSTEM) try { - testMigration(zkUrl, unsecureZkUtils, secureZkUtils) + testMigration(zkUrl, unsecureZkClient, secureZkClient) } finally { - unsecureZkUtils.close() - secureZkUtils.close() + unsecureZkClient.close() + secureZkClient.close() } } @@ -192,62 +215,62 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * Exercises the migration tool. It is used in these test cases: * testZkMigration, testZkAntiMigration, testChroot. */ - private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) { + private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: KafkaZkClient): Unit = { info(s"zkConnect string: $zkUrl") - for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) { + for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) { info(s"Creating $path") firstZk.makeSurePersistentPathExists(path) // Create a child for each znode to exercise the recurrent // traversal of the data tree - firstZk.createPersistentPath(s"$path/fpjwashere", "") + firstZk.createRecursive(s"$path/fpjwashere", "".getBytes(StandardCharsets.UTF_8)) } // Getting security option to determine how to verify ACLs. // Additionally, we create the consumers znode (not in // securePersistentZkPaths) to make sure that we don't // add ACLs to it. val secureOpt: String = - if (secondZk.isSecure) { - firstZk.createPersistentPath(ZkUtils.ConsumersPath) + if (secondZk.secure) { + firstZk.createRecursive(ConsumerPathZNode.path) "secure" } else { - secondZk.createPersistentPath(ZkUtils.ConsumersPath) + secondZk.createRecursive(ConsumerPathZNode.path) "unsecure" } ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl")) info("Done with migration") - for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) { - val sensitive = ZkUtils.sensitivePath(path) - val listParent = secondZk.zkConnection.getAcl(path).getKey - assertTrue(path, isAclCorrect(listParent, secondZk.isSecure, sensitive)) + for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) { + val sensitive = ZkData.sensitivePath(path) + val listParent = secondZk.getAcl(path) + assertTrue(path, isAclCorrect(listParent, secondZk.secure, sensitive)) val childPath = path + "/fpjwashere" - val listChild = secondZk.zkConnection.getAcl(childPath).getKey - assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure, sensitive)) + val listChild = secondZk.getAcl(childPath) + assertTrue(childPath, isAclCorrect(listChild, secondZk.secure, sensitive)) } // Check consumers path. - val consumersAcl = firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey - assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false, false)) + val consumersAcl = firstZk.getAcl(ConsumerPathZNode.path) + assertTrue(ConsumerPathZNode.path, isAclCorrect(consumersAcl, false, false)) } /** * Verifies that the path has the appropriate secure ACL. */ - private def verify(path: String): Boolean = { - val sensitive = ZkUtils.sensitivePath(path) - val list = zkUtils.zkConnection.getAcl(path).getKey - list.asScala.forall(TestUtils.isAclSecure(_, sensitive)) + private def verify(path: String): Unit = { + val sensitive = ZkData.sensitivePath(path) + val list = zkClient.getAcl(path) + assertTrue(list.forall(TestUtils.isAclSecure(_, sensitive))) } /** * Verifies ACL. */ - private def isAclCorrect(list: java.util.List[ACL], secure: Boolean, sensitive: Boolean): Boolean = { + private def isAclCorrect(list: Seq[ACL], secure: Boolean, sensitive: Boolean): Boolean = { val isListSizeCorrect = if (secure && !sensitive) list.size == 2 else list.size == 1 - isListSizeCorrect && list.asScala.forall( + isListSizeCorrect && list.forall( if (secure) TestUtils.isAclSecure(_, sensitive) else @@ -260,14 +283,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { * This is used in the testDelete and testDeleteRecursive * test cases. */ - private def deleteAllUnsecure() { + private def deleteAllUnsecure(): Unit = { System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false") - val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) + val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM) val result: Try[Boolean] = { - deleteRecursive(unsecureZkUtils, "/") + deleteRecursive(unsecureZkClient, "/") } // Clean up before leaving the test case - unsecureZkUtils.close() + unsecureZkClient.close() System.clearProperty(JaasUtils.ZK_SASL_CLIENT) // Fail the test if able to delete @@ -280,13 +303,13 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { /** * Tries to delete znodes recursively */ - private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = { + private def deleteRecursive(zkClient: KafkaZkClient, path: String): Try[Boolean] = { info(s"Deleting $path") var result: Try[Boolean] = Success(true) - for (child <- zkUtils.getChildren(path)) + for (child <- zkClient.getChildren(path)) result = (path match { - case "/" => deleteRecursive(zkUtils, s"/$child") - case path => deleteRecursive(zkUtils, s"$path/$child") + case "/" => deleteRecursive(zkClient, s"/$child") + case path => deleteRecursive(zkClient, s"$path/$child") }) match { case Success(_) => result case Failure(e) => Failure(e) @@ -297,7 +320,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { // For all other paths, try to delete it case path => try { - zkUtils.deletePath(path) + zkClient.deletePath(path, recursiveDelete = false) Failure(new Exception(s"Have been able to delete $path")) } catch { case _: Exception => result diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c3e7312f5af..b5a7583723e 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -26,8 +26,8 @@ import java.security.cert.X509Certificate import java.time.Duration import java.util.{Collections, Properties} import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit} - import javax.net.ssl.X509TrustManager + import kafka.api._ import kafka.cluster.{Broker, EndPoint} import kafka.log._ @@ -1132,30 +1132,30 @@ object TestUtils extends Logging { } } - private def secureZkPaths(zkUtils: ZkUtils): Seq[String] = { + private def secureZkPaths(zkClient: KafkaZkClient): Seq[String] = { def subPaths(path: String): Seq[String] = { - if (zkUtils.pathExists(path)) - path +: zkUtils.getChildren(path).map(c => path + "/" + c).flatMap(subPaths) + if (zkClient.pathExists(path)) + path +: zkClient.getChildren(path).map(c => path + "/" + c).flatMap(subPaths) else Seq.empty } - val topLevelPaths = ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths + val topLevelPaths = ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths topLevelPaths.flatMap(subPaths) } /** * Verifies that all secure paths in ZK are created with the expected ACL. */ - def verifySecureZkAcls(zkUtils: ZkUtils, usersWithAccess: Int) { - secureZkPaths(zkUtils).foreach(path => { - if (zkUtils.pathExists(path)) { - val sensitive = ZkUtils.sensitivePath(path) + def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: Int) { + secureZkPaths(zkClient).foreach(path => { + if (zkClient.pathExists(path)) { + val sensitive = ZkData.sensitivePath(path) // usersWithAccess have ALL access to path. For paths that are // not sensitive, world has READ access. val aclCount = if (sensitive) usersWithAccess else usersWithAccess + 1 - val acls = zkUtils.zkConnection.getAcl(path).getKey + val acls = zkClient.getAcl(path) assertEquals(s"Invalid ACLs for $path $acls", aclCount, acls.size) - acls.asScala.foreach(acl => isAclSecure(acl, sensitive)) + acls.foreach(acl => isAclSecure(acl, sensitive)) } }) } @@ -1164,12 +1164,12 @@ object TestUtils extends Logging { * Verifies that secure paths in ZK have no access control. This is * the case when zookeeper.set.acl=false and no ACLs have been configured. */ - def verifyUnsecureZkAcls(zkUtils: ZkUtils) { - secureZkPaths(zkUtils).foreach(path => { - if (zkUtils.pathExists(path)) { - val acls = zkUtils.zkConnection.getAcl(path).getKey + def verifyUnsecureZkAcls(zkClient: KafkaZkClient) { + secureZkPaths(zkClient).foreach(path => { + if (zkClient.pathExists(path)) { + val acls = zkClient.getAcl(path) assertEquals(s"Invalid ACLs for $path $acls", 1, acls.size) - acls.asScala.foreach(isAclUnsecure) + acls.foreach(isAclUnsecure) } }) } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 6de91597df9..a8df342998c 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -44,6 +44,7 @@ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper._ import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.data.Stat class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -543,6 +544,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.createRecursive(path) zkClient.deletePath(path) assertFalse(zkClient.pathExists(path)) + + zkClient.createRecursive(path) + zkClient.deletePath("/a") + assertFalse(zkClient.pathExists(path)) + + zkClient.createRecursive(path) + zkClient.deletePath(path, recursiveDelete = false) + assertFalse(zkClient.pathExists(path)) + assertTrue(zkClient.pathExists("/a/b")) } @Test @@ -1143,6 +1153,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(expectedConsumerGroupOffsetsPath, actualConsumerGroupOffsetsPath) } + @Test + def testAclMethods(): Unit = { + val mockPath = "/foo" + + intercept[NoNodeException] { + zkClient.getAcl(mockPath) + } + + intercept[NoNodeException] { + zkClient.setAcl(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala) + } + + zkClient.createRecursive(mockPath) + + zkClient.setAcl(mockPath, ZooDefs.Ids.READ_ACL_UNSAFE.asScala) + + assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath)) + } + class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends KafkaZkClient(zooKeeperClient, isSecure, time) { // Overwriting this method from the parent class to force the client to re-register the Broker. diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 8d34c489406..2f75fa27fc8 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -43,7 +43,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { val zkSessionTimeout = 6000 val zkMaxInFlightRequests = Int.MaxValue - protected val zkAclsEnabled: Option[Boolean] = None + protected def zkAclsEnabled: Option[Boolean] = None var zkClient: KafkaZkClient = null var adminZkClient: AdminZkClient = null ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated ZKUtils usage from ZkSecurityMigrator > ------------------------------------------------------- > > Key: KAFKA-7259 > URL: https://issues.apache.org/jira/browse/KAFKA-7259 > Project: Kafka > Issue Type: Task > Components: core > Reporter: Manikumar > Assignee: Manikumar > Priority: Minor > Fix For: 2.2.0 > > > ZkSecurityMigrator code currently uses ZKUtils. We can replace ZKUtils usage > with KafkaZkClient. Also remove usage of ZKUtils from various tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)