[ 
https://issues.apache.org/jira/browse/KAFKA-7259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704294#comment-16704294
 ] 

ASF GitHub Bot commented on KAFKA-7259:
---------------------------------------

omkreddy closed pull request #5480: KAFKA-7259: Remove deprecated ZKUtils usage 
from ZkSecurityMigrator
URL: https://github.com/apache/kafka/pull/5480
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a833db4dfd0..5cab801bf3f 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -17,9 +17,11 @@
 
 package kafka.admin
 
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging, ZkUtils}
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging}
+import kafka.zk.{KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
 import org.I0Itec.zkclient.exception.ZkException
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
@@ -92,8 +94,9 @@ object ZkSecurityMigrator extends Logging {
     val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
     val zkSessionTimeout = 
opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
     val zkConnectionTimeout = 
opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
-    val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
-    val migrator = new ZkSecurityMigrator(zkUtils)
+    val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, 
zkConnectionTimeout,
+      Int.MaxValue, Time.SYSTEM)
+    val migrator = new ZkSecurityMigrator(zkClient)
     migrator.run()
   }
 
@@ -120,20 +123,21 @@ object ZkSecurityMigrator extends Logging {
   }
 }
 
-class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
+class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
+  private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient)
   private val futures = new Queue[Future[String]]
 
-  private def setAcl(path: String, setPromise: Promise[String]) = {
+  private def setAcl(path: String, setPromise: Promise[String]): Unit = {
     info("Setting ACL for path %s".format(path))
-    zkUtils.zkConnection.getZookeeper.setACL(path, zkUtils.defaultAcls(path), 
-1, SetACLCallback, setPromise)
+    zkSecurityMigratorUtils.currentZooKeeper.setACL(path, 
zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise)
   }
 
-  private def getChildren(path: String, childrenPromise: Promise[String]) = {
+  private def getChildren(path: String, childrenPromise: Promise[String]): 
Unit = {
     info("Getting children to set ACLs for path %s".format(path))
-    zkUtils.zkConnection.getZookeeper.getChildren(path, false, 
GetChildrenCallback, childrenPromise)
+    zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, 
GetChildrenCallback, childrenPromise)
   }
 
-  private def setAclIndividually(path: String) = {
+  private def setAclIndividually(path: String): Unit = {
     val setPromise = Promise[String]
     futures.synchronized {
       futures += setPromise.future
@@ -141,7 +145,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
     setAcl(path, setPromise)
   }
 
-  private def setAclsRecursively(path: String) = {
+  private def setAclsRecursively(path: String): Unit = {
     val setPromise = Promise[String]
     val childrenPromise = Promise[String]
     futures.synchronized {
@@ -157,7 +161,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
                       path: String,
                       ctx: Object,
                       children: java.util.List[String]) {
-      val zkHandle = zkUtils.zkConnection.getZookeeper
+      val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
       val promise = ctx.asInstanceOf[Promise[String]]
       Code.get(rc) match {
         case Code.OK =>
@@ -191,7 +195,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
                       path: String,
                       ctx: Object,
                       stat: Stat) {
-      val zkHandle = zkUtils.zkConnection.getZookeeper
+      val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
       val promise = ctx.asInstanceOf[Promise[String]]
 
       Code.get(rc) match {
@@ -199,7 +203,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
           info("Successfully set ACLs for %s".format(path))
           promise success "done"
         case Code.CONNECTIONLOSS =>
-            zkHandle.setACL(path, zkUtils.defaultAcls(path), -1, 
SetACLCallback, ctx)
+            zkHandle.setACL(path, zkClient.defaultAcls(path).asJava, -1, 
SetACLCallback, ctx)
         case Code.NONODE =>
           warn("Znode is gone, it could be have been legitimately deleted: 
%s".format(path))
           promise success "done"
@@ -218,9 +222,9 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
   private def run(): Unit = {
     try {
       setAclIndividually("/")
-      for (path <- ZkUtils.SecureZkRootPaths) {
+      for (path <- ZkData.SecureRootPaths) {
         debug("Going to set ACL for %s".format(path))
-        zkUtils.makeSurePersistentPathExists(path)
+        zkClient.makeSurePersistentPathExists(path)
         setAclsRecursively(path)
       }
 
@@ -240,7 +244,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
       recurse()
 
     } finally {
-      zkUtils.close
+      zkClient.close
     }
   }
 }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 4c8c4e1c28b..281e920fb32 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -20,7 +20,6 @@ package kafka.tools
 import java.io._
 import java.nio.ByteBuffer
 
-import joptsimple.OptionParser
 import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, 
OffsetKey}
 import kafka.coordinator.transaction.TransactionLog
 import kafka.log._
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 4ad40ef6dad..732a8277d83 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -81,8 +81,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * @param data the znode data
    * @return the created path (including the appended monotonically increasing 
number)
    */
-  private[zk] def createSequentialPersistentPath(path: String, data: 
Array[Byte]): String = {
-    val createRequest = CreateRequest(path, data, acls(path), 
CreateMode.PERSISTENT_SEQUENTIAL)
+  private[kafka] def createSequentialPersistentPath(path: String, data: 
Array[Byte]): String = {
+    val createRequest = CreateRequest(path, data, defaultAcls(path), 
CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
     createResponse.name
@@ -137,7 +137,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
       try {
         val transaction = zooKeeperClient.createTransaction()
         transaction.create(ControllerZNode.path, 
ControllerZNode.encode(controllerId, timestamp),
-          acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
+          defaultAcls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
         transaction.setData(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(newControllerEpoch), 
expectedControllerEpochZkVersion)
         val results = transaction.commit()
         val setDataResult = results.get(1).asInstanceOf[SetDataResult]
@@ -214,7 +214,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     val createRequests = leaderIsrAndControllerEpochs.map { case (partition, 
leaderIsrAndControllerEpoch) =>
       val path = TopicPartitionStateZNode.path(partition)
       val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
-      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, 
Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT, 
Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests.toSeq)
   }
@@ -237,7 +237,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    */
   def createControllerEpochRaw(epoch: Int): CreateResponse = {
     val createRequest = CreateRequest(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(epoch),
-      acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
+      defaultAcls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
     retryRequestUntilConnected(createRequest)
   }
 
@@ -384,7 +384,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
   def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
     makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
     val path = ConfigEntityChangeNotificationSequenceZNode.createPath
-    val createRequest = CreateRequest(path, 
ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), 
acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createRequest = CreateRequest(path, 
ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), 
defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow()
   }
@@ -803,7 +803,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
 
     def create(reassignmentData: Array[Byte]): CreateResponse = {
-      val createRequest = CreateRequest(ReassignPartitionsZNode.path, 
reassignmentData, acls(ReassignPartitionsZNode.path),
+      val createRequest = CreateRequest(ReassignPartitionsZNode.path, 
reassignmentData, defaultAcls(ReassignPartitionsZNode.path),
         CreateMode.PERSISTENT, zkVersionCheck = 
controllerZkVersionCheck(expectedControllerEpochZkVersion))
       retryRequestUntilConnected(createRequest)
     }
@@ -832,9 +832,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * @param expectedControllerEpochZkVersion expected controller epoch 
zkVersion.
    */
   def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit 
= {
-    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, 
ZkVersion.MatchAnyVersion,
-      zkVersionCheck = 
controllerZkVersionCheck(expectedControllerEpochZkVersion))
-    retryRequestUntilConnected(deleteRequest)
+    deletePath(ReassignPartitionsZNode.path, expectedControllerEpochZkVersion)
   }
 
   /**
@@ -1114,7 +1112,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
   def createAclsForResourceIfNotExists(resource: Resource, aclsSet: Set[Acl]): 
(Boolean, Int) = {
     def create(aclData: Array[Byte]): CreateResponse = {
       val path = ResourceZNode.path(resource)
-      val createRequest = CreateRequest(path, aclData, acls(path), 
CreateMode.PERSISTENT)
+      val createRequest = CreateRequest(path, aclData, defaultAcls(path), 
CreateMode.PERSISTENT)
       retryRequestUntilConnected(createRequest)
     }
 
@@ -1134,7 +1132,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    */
   def createAclChangeNotification(resource: Resource): Unit = {
     val aclChange = 
ZkAclStore(resource.patternType).changeStore.createChangeNode(resource)
-    val createRequest = CreateRequest(aclChange.path, aclChange.bytes, 
acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createRequest = CreateRequest(aclChange.path, aclChange.bytes, 
defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
   }
@@ -1241,11 +1239,21 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
 
   /**
    * Deletes the zk node recursively
-   * @param path
-   * @return  return true if it succeeds, false otherwise
+   * @param path path to delete
+   * @param expectedControllerEpochZkVersion expected controller epoch 
zkVersion.
+   * @param recursiveDelete enable recursive delete
+   * @return KeeperException if there is an error while deleting the path
    */
-  def deletePath(path: String): Boolean = {
-    deleteRecursive(path)
+  def deletePath(path: String, expectedControllerEpochZkVersion: Int = 
ZkVersion.MatchAnyVersion, recursiveDelete: Boolean = true): Unit = {
+    if (recursiveDelete)
+      deleteRecursive(path, expectedControllerEpochZkVersion)
+    else {
+      val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion, 
zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      val deleteResponse = retryRequestUntilConnected(deleteRequest)
+      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != 
Code.NONODE) {
+          throw deleteResponse.resultException.get
+      }
+    }
   }
 
   /**
@@ -1262,7 +1270,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    */
   def createTokenChangeNotification(tokenId: String): Unit = {
     val path = DelegationTokenChangeNotificationSequenceZNode.createPath
-    val createRequest = CreateRequest(path, 
DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), acls(path), 
CreateMode.PERSISTENT_SEQUENTIAL)
+    val createRequest = CreateRequest(path, 
DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), 
defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.resultException.foreach(e => throw e)
   }
@@ -1283,7 +1291,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
 
     def create(tokenData: Array[Byte]): CreateResponse = {
       val path = DelegationTokenInfoZNode.path(token.tokenInfo().tokenId())
-      val createRequest = CreateRequest(path, tokenData, acls(path), 
CreateMode.PERSISTENT)
+      val createRequest = CreateRequest(path, tokenData, defaultAcls(path), 
CreateMode.PERSISTENT)
       retryRequestUntilConnected(createRequest)
     }
 
@@ -1439,6 +1447,31 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
   }
 
+  /**
+    * Return the ACLs of the node of the given path
+    * @param path the given path for the node
+    * @return the ACL array of the given node.
+    */
+  def getAcl(path: String): Seq[ACL] = {
+    val getAclRequest = GetAclRequest(path)
+    val getAclResponse = retryRequestUntilConnected(getAclRequest)
+    getAclResponse.resultCode match {
+      case Code.OK => getAclResponse.acl
+      case _ => throw getAclResponse.resultException.get
+    }
+  }
+
+  /**
+    * sets the ACLs to the node of the given path
+    * @param path the given path for the node
+    * @param acl the given acl for the node
+    */
+  def setAcl(path: String, acl: Seq[ACL]): Unit = {
+    val setAclRequest = SetAclRequest(path, acl, ZkVersion.MatchAnyVersion)
+    val setAclResponse = retryRequestUntilConnected(setAclRequest)
+    setAclResponse.maybeThrow
+  }
+
   /**
     * Create the cluster Id. If the cluster id already exists, return the 
current cluster id.
     * @return  cluster id
@@ -1529,7 +1562,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
   }
 
-  private[zk] def createRecursive(path: String, data: Array[Byte] = null, 
throwIfPathExists: Boolean = true) = {
+  private[kafka] def createRecursive(path: String, data: Array[Byte] = null, 
throwIfPathExists: Boolean = true) = {
 
     def parentPath(path: String): String = {
       val indexOfLastSlash = path.lastIndexOf("/")
@@ -1538,7 +1571,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
 
     def createRecursive0(path: String): Unit = {
-      val createRequest = CreateRequest(path, null, acls(path), 
CreateMode.PERSISTENT)
+      val createRequest = CreateRequest(path, null, defaultAcls(path), 
CreateMode.PERSISTENT)
       var createResponse = retryRequestUntilConnected(createRequest)
       if (createResponse.resultCode == Code.NONODE) {
         createRecursive0(parentPath(path))
@@ -1551,7 +1584,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
       }
     }
 
-    val createRequest = CreateRequest(path, data, acls(path), 
CreateMode.PERSISTENT)
+    val createRequest = CreateRequest(path, data, defaultAcls(path), 
CreateMode.PERSISTENT)
     var createResponse = retryRequestUntilConnected(createRequest)
 
     if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
@@ -1569,7 +1602,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
   private def createTopicPartition(partitions: Seq[TopicPartition], 
expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
     val createRequests = partitions.map { partition =>
       val path = TopicPartitionZNode.path(partition)
-      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, 
Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, 
Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests)
   }
@@ -1577,7 +1610,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
   private def createTopicPartitions(topics: Seq[String], 
expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = {
     val createRequests = topics.map { topic =>
       val path = TopicPartitionsZNode.path(topic)
-      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, 
Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, 
Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests)
   }
@@ -1589,7 +1622,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     retryRequestsUntilConnected(getDataRequests)
   }
 
-  private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
+  def defaultAcls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
+
+  def secure: Boolean = isSecure
 
   private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: 
Req): Req#Response = {
     retryRequestsUntilConnected(Seq(request)).head
@@ -1655,7 +1690,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends 
Logging {
     def create(): Code = {
-      val createRequest = CreateRequest(path, data, acls(path), 
CreateMode.EPHEMERAL)
+      val createRequest = CreateRequest(path, data, defaultAcls(path), 
CreateMode.EPHEMERAL)
       val createResponse = retryRequestUntilConnected(createRequest)
       val createResultCode = createResponse.resultCode match {
         case code@ Code.OK =>
diff --git a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala 
b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
new file mode 100644
index 00000000000..31a7ba29073
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import org.apache.zookeeper.ZooKeeper
+
+/**
+ * This class should only be used in ZkSecurityMigrator tool.
+ * This class will be removed after we migrate ZkSecurityMigrator away from 
ZK's asynchronous API.
+ * @param kafkaZkClient
+ */
+class ZkSecurityMigratorUtils(val kafkaZkClient: KafkaZkClient) {
+
+  def currentZooKeeper: ZooKeeper = kafkaZkClient.currentZooKeeper
+
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 5789d1ae8ca..c15a5080bf2 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -16,9 +16,8 @@ import java.io.File
 import java.util.Locale
 
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
+import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
 
@@ -29,6 +28,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest 
with SaslSetup {
   private val kafkaServerJaasEntryName =
     
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+  // disable secure acls of zkClient in ZooKeeperTestHarness
+  override protected def zkAclsEnabled = Some(false)
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
   override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
@@ -36,7 +37,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest 
with SaslSetup {
 
   @Before
   override def setUp(): Unit = {
-    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName))
     super.setUp()
   }
 
@@ -52,8 +53,6 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest 
with SaslSetup {
    */
   @Test
   def testZkAclsDisabled() {
-    val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-    TestUtils.verifyUnsecureZkAcls(zkUtils)
-    CoreUtils.swallow(zkUtils.close(), this)
+    TestUtils.verifyUnsecureZkAcls(zkClient)
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index efb8c48c7c4..bbe0dd8356d 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -22,12 +22,11 @@ import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils}
 import kafka.utils.JaasTestUtils._
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
 import org.junit.Test
@@ -134,8 +133,6 @@ class SaslPlainSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
    */
   @Test
   def testAcls() {
-    val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-    TestUtils.verifySecureZkAcls(zkUtils, 1)
-    CoreUtils.swallow(zkUtils.close(), this)
+    TestUtils.verifySecureZkAcls(zkClient, 1)
   }
 }
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala 
b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 9c6ae0b4199..20c28e76a20 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -19,16 +19,16 @@ package kafka
 
 import java.io.{File, PrintWriter}
 import java.nio.file.{Files, StandardOpenOption}
-
 import javax.imageio.ImageIO
+
 import kafka.admin.ReassignPartitionsCommand
 import kafka.admin.ReassignPartitionsCommand.Throttle
-import org.apache.kafka.common.TopicPartition
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
 import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
 import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
 import org.jfree.chart.plot.PlotOrientation
 import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart}
 import org.jfree.data.xy.{XYSeries, XYSeriesCollection}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 5d2d873425f..5cfab905777 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -20,10 +20,9 @@ import org.junit.Assert._
 import org.junit.Test
 import kafka.utils.Logging
 import kafka.utils.TestUtils
-import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode, 
ZooKeeperTestHarness}
 import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
-import kafka.utils.ZkUtils.getDeleteTopicPath
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.config.ConfigException
@@ -80,7 +79,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with 
Logging with RackAwareT
 
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
-    val deletePath = getDeleteTopicPath(normalTopic)
+    val deletePath = DeleteTopicsTopicZNode.path(normalTopic)
     assertFalse("Delete path for topic shouldn't exist before deletion.", 
zkClient.pathExists(deletePath))
     TopicCommand.deleteTopic(zkClient, deleteOpts)
     assertTrue("Delete path for topic should exist after deletion.", 
zkClient.pathExists(deletePath))
@@ -93,7 +92,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with 
Logging with RackAwareT
 
     // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it 
doesn't
     val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", 
Topic.GROUP_METADATA_TOPIC_NAME))
-    val deleteOffsetTopicPath = 
getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
+    val deleteOffsetTopicPath = 
DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
     assertFalse("Delete path for topic shouldn't exist before deletion.", 
zkClient.pathExists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
       TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 1cdbe4b2a0e..de5ae22cba6 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -17,23 +17,32 @@
 
 package kafka.security.auth
 
+import java.nio.charset.StandardCharsets
+
 import kafka.admin.ZkSecurityMigrator
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
-import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
-import org.apache.kafka.common.KafkaException
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk._
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
-import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 import javax.security.auth.login.Configuration
 
+import kafka.api.ApiVersion
+import kafka.cluster.{Broker, EndPoint}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
 class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   val jaasFile = 
kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
-  var zkUtils: ZkUtils = null
 
   @Before
   override def setUp() {
@@ -41,13 +50,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with 
Logging {
     Configuration.setConfiguration(null)
     System.setProperty(authProvider, 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
     super.setUp()
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
   }
 
   @After
   override def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
     super.tearDown()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     System.clearProperty(authProvider)
@@ -59,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with 
Logging {
    * secure ACLs and authentication with ZooKeeper.
    */
   @Test
-  def testIsZkSecurityEnabled() {
+  def testIsZkSecurityEnabled(): Unit = {
     assertTrue(JaasUtils.isZkSecurityEnabled())
     Configuration.setConfiguration(null)
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
@@ -75,59 +81,76 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with 
Logging {
   }
 
   /**
-   * Exercises the code in ZkUtils. The goal is mainly
-   * to verify that the behavior of ZkUtils is correct
+   * Exercises the code in KafkaZkClient. The goal is mainly
+   * to verify that the behavior of KafkaZkClient is correct
    * when isSecure is set to true.
    */
   @Test
-  def testZkUtils() {
-    assertTrue(zkUtils.isSecure)
-    for (path <- zkUtils.persistentZkPaths) {
-      zkUtils.makeSurePersistentPathExists(path)
-      if (ZkUtils.sensitivePath(path)) {
-        val aclList = zkUtils.zkConnection.getAcl(path).getKey
+  def testKafkaZkClient(): Unit = {
+    assertTrue(zkClient.secure)
+    for (path <- ZkData.PersistentZkPaths) {
+      zkClient.makeSurePersistentPathExists(path)
+      if (ZkData.sensitivePath(path)) {
+        val aclList = zkClient.getAcl(path)
         assertEquals(s"Unexpected acl list size for $path", 1, aclList.size)
-        for (acl <- aclList.asScala)
+        for (acl <- aclList)
           assertTrue(TestUtils.isAclSecure(acl, sensitive = true))
-      } else if (!path.equals(ZkUtils.ConsumersPath)) {
-        val aclList = zkUtils.zkConnection.getAcl(path).getKey
+      } else if (!path.equals(ConsumerPathZNode.path)) {
+        val aclList = zkClient.getAcl(path)
         assertEquals(s"Unexpected acl list size for $path", 2, aclList.size)
-        for (acl <- aclList.asScala)
+        for (acl <- aclList)
           assertTrue(TestUtils.isAclSecure(acl, sensitive = false))
       }
     }
-    // Test that can create: createEphemeralPathExpectConflict
-    zkUtils.createEphemeralPathExpectConflict("/a", "")
-    verify("/a")
-    // Test that can create: createPersistentPath
-    zkUtils.createPersistentPath("/b")
-    verify("/b")
+
+    // Test that creates Ephemeral node
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, 
SecurityProtocol.PLAINTEXT)
+    zkClient.registerBroker(brokerInfo)
+    verify(brokerInfo.path)
+
+    // Test that creates persistent nodes
+    val topic1 = "topic1"
+    val assignment = Map(
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      new TopicPartition(topic1, 1) -> Seq(0, 1),
+      new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
+    )
+
+    // create a topic assignment
+    zkClient.createTopicAssignment(topic1, assignment)
+    verify(TopicZNode.path(topic1))
+
     // Test that can create: createSequentialPersistentPath
-    val seqPath = zkUtils.createSequentialPersistentPath("/c", "")
+    val seqPath = zkClient.createSequentialPersistentPath("/c", 
"".getBytes(StandardCharsets.UTF_8))
     verify(seqPath)
-    // Test that can update: updateEphemeralPath
-    zkUtils.updateEphemeralPath("/a", "updated")
-    val valueA: String = zkUtils.zkClient.readData("/a")
-    assertTrue(valueA.equals("updated"))
-    // Test that can update: updatePersistentPath
-    zkUtils.updatePersistentPath("/b", "updated")
-    val valueB: String = zkUtils.zkClient.readData("/b")
-    assertTrue(valueB.equals("updated"))
 
-    info("Leaving testZkUtils")
+    // Test that can update Ephemeral node
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, 
SecurityProtocol.SSL)
+    zkClient.updateBrokerInfo(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Test that can update persistent nodes
+    val updatedAssignment = assignment - new TopicPartition(topic1, 2)
+    zkClient.setTopicAssignment(topic1, updatedAssignment)
+    assertEquals(updatedAssignment.size, 
zkClient.getTopicPartitionCount(topic1).get)
   }
 
+  private def createBrokerInfo(id: Int, host: String, port: Int, 
securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, 
ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), 
ApiVersion.latestVersion, jmxPort = port + 10)
+
   /**
    * Tests the migration tool when making an unsecure
    * cluster secure.
    */
   @Test
-  def testZkMigration() {
-    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) 
+  def testZkMigration(): Unit = {
+    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, 
Int.MaxValue, Time.SYSTEM)
     try {
-      testMigration(zkConnect, unsecureZkUtils, zkUtils)
+      testMigration(zkConnect, unsecureZkClient, zkClient)
     } finally {
-      unsecureZkUtils.close()
+      unsecureZkClient.close()
     }
   }
 
@@ -136,12 +159,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
    * cluster unsecure.
    */
   @Test
-  def testZkAntiMigration() {
-    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+  def testZkAntiMigration(): Unit = {
+    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, 
Int.MaxValue, Time.SYSTEM)
     try {
-      testMigration(zkConnect, zkUtils, unsecureZkUtils)
+      testMigration(zkConnect, zkClient, unsecureZkClient)
     } finally {
-      unsecureZkUtils.close()
+      unsecureZkClient.close()
     }
   }
 
@@ -149,42 +172,42 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
    * Tests that the persistent paths cannot be deleted.
    */
   @Test
-  def testDelete() {
+  def testDelete(): Unit = {
     info(s"zkConnect string: $zkConnect")
     ZkSecurityMigrator.run(Array("--zookeeper.acl=secure", 
s"--zookeeper.connect=$zkConnect"))
     deleteAllUnsecure()
   }
 
   /**
-   * Tests that znodes cannot be deleted when the 
+   * Tests that znodes cannot be deleted when the
    * persistent paths have children.
    */
   @Test
-  def testDeleteRecursive() {
+  def testDeleteRecursive(): Unit = {
     info(s"zkConnect string: $zkConnect")
-    for (path <- ZkUtils.SecureZkRootPaths) {
+    for (path <- ZkData.SecureRootPaths) {
       info(s"Creating $path")
-      zkUtils.makeSurePersistentPathExists(path)
-      zkUtils.createPersistentPath(s"$path/fpjwashere", "")
+      zkClient.makeSurePersistentPathExists(path)
+      zkClient.createRecursive(s"$path/fpjwashere", 
"".getBytes(StandardCharsets.UTF_8))
     }
-    zkUtils.zkConnection.setAcl("/", zkUtils.defaultAcls("/"), -1)
+    zkClient.setAcl("/", zkClient.defaultAcls("/"))
     deleteAllUnsecure()
   }
-  
+
   /**
    * Tests the migration tool when chroot is being used.
    */
   @Test
   def testChroot(): Unit = {
     val zkUrl = zkConnect + "/kafka"
-    zkUtils.createPersistentPath("/kafka")
-    val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)
-    val secureZkUtils = ZkUtils(zkUrl, 6000, 6000, true)
+    zkClient.createRecursive("/kafka")
+    val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, 
Int.MaxValue, Time.SYSTEM)
+    val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, 
Time.SYSTEM)
     try {
-      testMigration(zkUrl, unsecureZkUtils, secureZkUtils)
+      testMigration(zkUrl, unsecureZkClient, secureZkClient)
     } finally {
-      unsecureZkUtils.close()
-      secureZkUtils.close()
+      unsecureZkClient.close()
+      secureZkClient.close()
     }
   }
 
@@ -192,62 +215,62 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
    * Exercises the migration tool. It is used in these test cases:
    * testZkMigration, testZkAntiMigration, testChroot.
    */
-  private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: 
ZkUtils) {
+  private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: 
KafkaZkClient): Unit = {
     info(s"zkConnect string: $zkUrl")
-    for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
+    for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
       info(s"Creating $path")
       firstZk.makeSurePersistentPathExists(path)
       // Create a child for each znode to exercise the recurrent
       // traversal of the data tree
-      firstZk.createPersistentPath(s"$path/fpjwashere", "")
+      firstZk.createRecursive(s"$path/fpjwashere", 
"".getBytes(StandardCharsets.UTF_8))
     }
     // Getting security option to determine how to verify ACLs.
     // Additionally, we create the consumers znode (not in
     // securePersistentZkPaths) to make sure that we don't
     // add ACLs to it.
     val secureOpt: String =
-      if (secondZk.isSecure) {
-        firstZk.createPersistentPath(ZkUtils.ConsumersPath)
+      if (secondZk.secure) {
+        firstZk.createRecursive(ConsumerPathZNode.path)
         "secure"
       } else {
-        secondZk.createPersistentPath(ZkUtils.ConsumersPath)
+        secondZk.createRecursive(ConsumerPathZNode.path)
         "unsecure"
       }
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", 
s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
-    for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
-      val sensitive = ZkUtils.sensitivePath(path)
-      val listParent = secondZk.zkConnection.getAcl(path).getKey
-      assertTrue(path, isAclCorrect(listParent, secondZk.isSecure, sensitive))
+    for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+      val sensitive = ZkData.sensitivePath(path)
+      val listParent = secondZk.getAcl(path)
+      assertTrue(path, isAclCorrect(listParent, secondZk.secure, sensitive))
 
       val childPath = path + "/fpjwashere"
-      val listChild = secondZk.zkConnection.getAcl(childPath).getKey
-      assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure, 
sensitive))
+      val listChild = secondZk.getAcl(childPath)
+      assertTrue(childPath, isAclCorrect(listChild, secondZk.secure, 
sensitive))
     }
     // Check consumers path.
-    val consumersAcl = 
firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey
-    assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false, false))
+    val consumersAcl = firstZk.getAcl(ConsumerPathZNode.path)
+    assertTrue(ConsumerPathZNode.path, isAclCorrect(consumersAcl, false, 
false))
   }
 
   /**
    * Verifies that the path has the appropriate secure ACL.
    */
-  private def verify(path: String): Boolean = {
-    val sensitive = ZkUtils.sensitivePath(path)
-    val list = zkUtils.zkConnection.getAcl(path).getKey
-    list.asScala.forall(TestUtils.isAclSecure(_, sensitive))
+  private def verify(path: String): Unit = {
+    val sensitive = ZkData.sensitivePath(path)
+    val list = zkClient.getAcl(path)
+    assertTrue(list.forall(TestUtils.isAclSecure(_, sensitive)))
   }
 
   /**
    * Verifies ACL.
    */
-  private def isAclCorrect(list: java.util.List[ACL], secure: Boolean, 
sensitive: Boolean): Boolean = {
+  private def isAclCorrect(list: Seq[ACL], secure: Boolean, sensitive: 
Boolean): Boolean = {
     val isListSizeCorrect =
       if (secure && !sensitive)
         list.size == 2
       else
         list.size == 1
-    isListSizeCorrect && list.asScala.forall(
+    isListSizeCorrect && list.forall(
       if (secure)
         TestUtils.isAclSecure(_, sensitive)
       else
@@ -260,14 +283,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
    * This is used in the testDelete and testDeleteRecursive
    * test cases.
    */
-  private def deleteAllUnsecure() {
+  private def deleteAllUnsecure(): Unit = {
     System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
-    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, 
Int.MaxValue, Time.SYSTEM)
     val result: Try[Boolean] = {
-      deleteRecursive(unsecureZkUtils, "/")
+      deleteRecursive(unsecureZkClient, "/")
     }
     // Clean up before leaving the test case
-    unsecureZkUtils.close()
+    unsecureZkClient.close()
     System.clearProperty(JaasUtils.ZK_SASL_CLIENT)
     
     // Fail the test if able to delete
@@ -280,13 +303,13 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
   /**
    * Tries to delete znodes recursively
    */
-  private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = {
+  private def deleteRecursive(zkClient: KafkaZkClient, path: String): 
Try[Boolean] = {
     info(s"Deleting $path")
     var result: Try[Boolean] = Success(true)
-    for (child <- zkUtils.getChildren(path))
+    for (child <- zkClient.getChildren(path))
       result = (path match {
-        case "/" => deleteRecursive(zkUtils, s"/$child")
-        case path => deleteRecursive(zkUtils, s"$path/$child")
+        case "/" => deleteRecursive(zkClient, s"/$child")
+        case path => deleteRecursive(zkClient, s"$path/$child")
       }) match {
         case Success(_) => result
         case Failure(e) => Failure(e)
@@ -297,7 +320,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with 
Logging {
       // For all other paths, try to delete it
       case path =>
         try {
-          zkUtils.deletePath(path)
+          zkClient.deletePath(path, recursiveDelete = false)
           Failure(new Exception(s"Have been able to delete $path"))
         } catch {
           case _: Exception => result
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c3e7312f5af..b5a7583723e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,8 +26,8 @@ import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-
 import javax.net.ssl.X509TrustManager
+
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log._
@@ -1132,30 +1132,30 @@ object TestUtils extends Logging {
     }
   }
 
-  private def secureZkPaths(zkUtils: ZkUtils): Seq[String] = {
+  private def secureZkPaths(zkClient: KafkaZkClient): Seq[String] = {
     def subPaths(path: String): Seq[String] = {
-      if (zkUtils.pathExists(path))
-        path +: zkUtils.getChildren(path).map(c => path + "/" + 
c).flatMap(subPaths)
+      if (zkClient.pathExists(path))
+        path +: zkClient.getChildren(path).map(c => path + "/" + 
c).flatMap(subPaths)
       else
         Seq.empty
     }
-    val topLevelPaths = ZkUtils.SecureZkRootPaths ++ 
ZkUtils.SensitiveZkRootPaths
+    val topLevelPaths = ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths
     topLevelPaths.flatMap(subPaths)
   }
 
   /**
    * Verifies that all secure paths in ZK are created with the expected ACL.
    */
-  def verifySecureZkAcls(zkUtils: ZkUtils, usersWithAccess: Int) {
-    secureZkPaths(zkUtils).foreach(path => {
-      if (zkUtils.pathExists(path)) {
-        val sensitive = ZkUtils.sensitivePath(path)
+  def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: Int) {
+    secureZkPaths(zkClient).foreach(path => {
+      if (zkClient.pathExists(path)) {
+        val sensitive = ZkData.sensitivePath(path)
         // usersWithAccess have ALL access to path. For paths that are
         // not sensitive, world has READ access.
         val aclCount = if (sensitive) usersWithAccess else usersWithAccess + 1
-        val acls = zkUtils.zkConnection.getAcl(path).getKey
+        val acls = zkClient.getAcl(path)
         assertEquals(s"Invalid ACLs for $path $acls", aclCount, acls.size)
-        acls.asScala.foreach(acl => isAclSecure(acl, sensitive))
+        acls.foreach(acl => isAclSecure(acl, sensitive))
       }
     })
   }
@@ -1164,12 +1164,12 @@ object TestUtils extends Logging {
    * Verifies that secure paths in ZK have no access control. This is
    * the case when zookeeper.set.acl=false and no ACLs have been configured.
    */
-  def verifyUnsecureZkAcls(zkUtils: ZkUtils) {
-    secureZkPaths(zkUtils).foreach(path => {
-      if (zkUtils.pathExists(path)) {
-        val acls = zkUtils.zkConnection.getAcl(path).getKey
+  def verifyUnsecureZkAcls(zkClient: KafkaZkClient) {
+    secureZkPaths(zkClient).foreach(path => {
+      if (zkClient.pathExists(path)) {
+        val acls = zkClient.getAcl(path)
         assertEquals(s"Invalid ACLs for $path $acls", 1, acls.size)
-        acls.asScala.foreach(isAclUnsecure)
+        acls.foreach(isAclUnsecure)
       }
     })
   }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 6de91597df9..a8df342998c 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -44,6 +44,7 @@ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.data.Stat
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -543,6 +544,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createRecursive(path)
     zkClient.deletePath(path)
     assertFalse(zkClient.pathExists(path))
+
+    zkClient.createRecursive(path)
+    zkClient.deletePath("/a")
+    assertFalse(zkClient.pathExists(path))
+
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path, recursiveDelete =  false)
+    assertFalse(zkClient.pathExists(path))
+    assertTrue(zkClient.pathExists("/a/b"))
   }
 
   @Test
@@ -1143,6 +1153,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(expectedConsumerGroupOffsetsPath, 
actualConsumerGroupOffsetsPath)
   }
 
+  @Test
+  def testAclMethods(): Unit = {
+    val mockPath = "/foo"
+
+    intercept[NoNodeException] {
+      zkClient.getAcl(mockPath)
+    }
+
+    intercept[NoNodeException] {
+      zkClient.setAcl(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)
+    }
+
+    zkClient.createRecursive(mockPath)
+
+    zkClient.setAcl(mockPath, ZooDefs.Ids.READ_ACL_UNSAFE.asScala)
+
+    assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, 
zkClient.getAcl(mockPath))
+  }
+
   class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, 
isSecure: Boolean, time: Time)
     extends KafkaZkClient(zooKeeperClient, isSecure, time) {
     // Overwriting this method from the parent class to force the client to 
re-register the Broker.
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala 
b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 8d34c489406..2f75fa27fc8 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -43,7 +43,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with 
Logging {
   val zkSessionTimeout = 6000
   val zkMaxInFlightRequests = Int.MaxValue
 
-  protected val zkAclsEnabled: Option[Boolean] = None
+  protected def zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated ZKUtils usage from ZkSecurityMigrator
> -------------------------------------------------------
>
>                 Key: KAFKA-7259
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7259
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>            Reporter: Manikumar
>            Assignee: Manikumar
>            Priority: Minor
>             Fix For: 2.2.0
>
>
> ZkSecurityMigrator code currently uses ZKUtils.  We can replace ZKUtils usage 
> with KafkaZkClient. Also remove usage of ZKUtils from various tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to