Repository: kafka
Updated Branches:
  refs/heads/trunk f7f8e1121 -> ab6f848ba


http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 
b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 028201f..ca8a0d6 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -68,7 +68,7 @@ class TopicFilterTest extends JUnitSuite {
       topicCount.getTopicCountMap.head._1
     }
     //lets make sure that the JSON strings are escaping as we expect
-    //if they are not then when they get saved to zookeeper and read back out 
they will be broken on parse
+    //if they are not then when they get saved to ZooKeeper and read back out 
they will be broken on parse
     assertEquals("-\\\"-", getTopicCountMapKey("-\"-"))
     assertEquals("-\\\\-", getTopicCountMapKey("-\\-"))
     assertEquals("-\\/-", getTopicCountMapKey("-/-"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 296f4a7..81df1e1 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -18,10 +18,12 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper.{CreateResponse, GetDataResponse, 
ZooKeeperClientException}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -33,7 +35,7 @@ import scala.collection.mutable
 
 class PartitionStateMachineTest extends JUnitSuite {
   private var controllerContext: ControllerContext = null
-  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = 
null
   private var mockTopicDeletionManager: TopicDeletionManager = null
   private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = 
null
@@ -49,12 +51,12 @@ class PartitionStateMachineTest extends JUnitSuite {
   def setUp(): Unit = {
     controllerContext = new ControllerContext
     controllerContext.epoch = controllerEpoch
-    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = 
EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
     mockTopicDeletionManager = 
EasyMock.createMock(classOf[TopicDeletionManager])
     partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
     partitionStateMachine = new PartitionStateMachine(config, new 
StateChangeLogger(brokerId, true, None), controllerContext, 
mockTopicDeletionManager,
-      mockZkUtils, partitionState, mockControllerBrokerRequestBatch)
+      mockZkClient, partitionState, mockControllerBrokerRequestBatch)
   }
 
   @Test
@@ -82,14 +84,14 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> 
leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition 
-> leaderIsrAndControllerEpoch)))
       .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, 
Seq(brokerId), isNew = true))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -100,12 +102,12 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> 
leaderIsrAndControllerEpoch)))
-      .andThrow(new ZookeeperClientException("test"))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition 
-> leaderIsrAndControllerEpoch)))
+      .andThrow(new ZooKeeperClientException("test"))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(NewPartition, partitionState(partition))
   }
 
@@ -116,12 +118,12 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> 
leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition 
-> leaderIsrAndControllerEpoch)))
       .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), 
null)))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(NewPartition, partitionState(partition))
   }
 
@@ -150,22 +152,22 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
       Seq(brokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(PreferredReplicaPartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -182,22 +184,22 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     val leaderAndIsrAfterElection = 
leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition.topic, partition.partition, 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
       Seq(brokerId, otherBrokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(ControlledShutdownPartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -233,23 +235,23 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
-    EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals()))
+    EasyMock.expect(mockZkClient.getLogConfigs(Seq.empty, config.originals()))
       .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), 
Seq(brokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -263,14 +265,14 @@ class PartitionStateMachineTest extends JUnitSuite {
     controllerContext.partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
 
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andThrow(new ZookeeperClientException(""))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
+      .andThrow(new ZooKeeperClientException(""))
 
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OfflinePartition, partitionState(partition))
   }
 
@@ -285,15 +287,15 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OfflinePartition, partitionState(partition))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 0afe7c2..6363d41 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -18,9 +18,11 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper.GetDataResponse
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -32,7 +34,7 @@ import scala.collection.mutable
 
 class ReplicaStateMachineTest extends JUnitSuite {
   private var controllerContext: ControllerContext = null
-  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = 
null
   private var mockTopicDeletionManager: TopicDeletionManager = null
   private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = 
null
@@ -50,11 +52,11 @@ class ReplicaStateMachineTest extends JUnitSuite {
   def setUp(): Unit = {
     controllerContext = new ControllerContext
     controllerContext.epoch = controllerEpoch
-    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = 
EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
     mockTopicDeletionManager = 
EasyMock.createMock(classOf[TopicDeletionManager])
     replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState]
-    replicaStateMachine = new ReplicaStateMachine(config, new 
StateChangeLogger(brokerId, true, None), controllerContext, 
mockTopicDeletionManager, mockZkUtils,
+    replicaStateMachine = new ReplicaStateMachine(config, new 
StateChangeLogger(brokerId, true, None), controllerContext, 
mockTopicDeletionManager, mockZkClient,
       replicaState, mockControllerBrokerRequestBatch)
   }
 
@@ -155,9 +157,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, 
Seq(brokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlineReplica, replicaState(replica))
   }
 
@@ -178,19 +180,19 @@ class ReplicaStateMachineTest extends JUnitSuite {
     val adjustedLeaderAndIsr = 
leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
     val updatedLeaderAndIsr = 
adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
     val updatedLeaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
adjustedLeaderAndIsr), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
adjustedLeaderAndIsr), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition.topic, partition.partition, 
updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
 
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch, 
mockTopicDeletionManager)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, 
mockTopicDeletionManager)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch, 
mockTopicDeletionManager)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch, 
mockTopicDeletionManager)
     assertEquals(updatedLeaderIsrAndControllerEpoch, 
controllerContext.partitionLeadershipInfo(partition))
     assertEquals(OfflineReplica, replicaState(replica))
   }
@@ -230,9 +232,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, 
Seq(brokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlineReplica, replicaState(replica))
   }
 
@@ -244,9 +246,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     
EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, true, 
callbacks.stopReplicaResponseCallback))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, 
callbacks)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(ReplicaDeletionStarted, replicaState(replica))
   }
 
@@ -348,9 +350,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, 
Seq(brokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlineReplica, replicaState(replica))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
deleted file mode 100644
index d7b46c7..0000000
--- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.controller
-
-import java.net.UnknownHostException
-import java.nio.charset.StandardCharsets
-import java.util.UUID
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
-
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.{CreateMode, ZooDefs}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
-import org.junit.{After, Test}
-
-class ZookeeperClientTest extends ZooKeeperTestHarness {
-  private val mockPath = "/foo"
-
-  @After
-  override def tearDown() {
-    super.tearDown()
-    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    Configuration.setConfiguration(null)
-  }
-
-  @Test(expected = classOf[UnknownHostException])
-  def testUnresolvableConnectString(): Unit = {
-    new ZookeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
-  }
-
-  @Test(expected = classOf[ZookeeperClientTimeoutException])
-  def testConnectionTimeout(): Unit = {
-    zookeeper.shutdown()
-    new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 
100, null)
-  }
-
-  @Test
-  def testConnection(): Unit = {
-    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-  }
-
-  @Test
-  def testDeleteNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
deleteResponse.resultCode)
-  }
-
-  @Test
-  def testDeleteExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
-    assertEquals("Response code for delete should be OK", Code.OK, 
deleteResponse.resultCode)
-  }
-
-  @Test
-  def testExistsNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
existsResponse.resultCode)
-  }
-
-  @Test
-  def testExistsExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
-    assertEquals("Response code for exists should be OK", Code.OK, 
existsResponse.resultCode)
-  }
-
-  @Test
-  def testGetDataNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val getDataResponse = 
zookeeperClient.handleRequest(GetDataRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
getDataResponse.resultCode)
-  }
-
-  @Test
-  def testGetDataExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val data = bytes
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
-      CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val getDataResponse = 
zookeeperClient.handleRequest(GetDataRequest(mockPath))
-    assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
-    assertArrayEquals("Data for getData should match created znode data", 
data, getDataResponse.data)
-  }
-
-  @Test
-  def testSetDataNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val setDataResponse = 
zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
setDataResponse.resultCode)
-  }
-
-  @Test
-  def testSetDataExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val data = bytes
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val setDataResponse = 
zookeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
-    assertEquals("Response code for setData should be OK", Code.OK, 
setDataResponse.resultCode)
-    val getDataResponse = 
zookeeperClient.handleRequest(GetDataRequest(mockPath))
-    assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
-    assertArrayEquals("Data for getData should match setData's data", data, 
getDataResponse.data)
-  }
-
-  @Test
-  def testGetAclNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
getAclResponse.resultCode)
-  }
-
-  @Test
-  def testGetAclExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
-    assertEquals("Response code for getAcl should be OK", Code.OK, 
getAclResponse.resultCode)
-    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl)
-  }
-
-  @Test
-  def testSetAclNonExistentZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val setAclResponse = zookeeperClient.handleRequest(SetAclRequest(mockPath, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
setAclResponse.resultCode)
-  }
-
-  @Test
-  def testGetChildrenNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, 
getChildrenResponse.resultCode)
-  }
-
-  @Test
-  def testGetChildrenExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
-    assertEquals("getChildren should return no children", Seq.empty[String], 
getChildrenResponse.children)
-  }
-
-  @Test
-  def testGetChildrenExistingZNodeWithChildren(): Unit = {
-    import scala.collection.JavaConverters._
-    val child1 = "child1"
-    val child2 = "child2"
-    val child1Path = mockPath + "/" + child1
-    val child2Path = mockPath + "/" + child2
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val createResponseChild1 = 
zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create child1 should be OK", Code.OK, 
createResponseChild1.resultCode)
-    val createResponseChild2 = 
zookeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create child2 should be OK", Code.OK, 
createResponseChild2.resultCode)
-
-    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
-    assertEquals("getChildren should return two children", Seq(child1, 
child2), getChildrenResponse.children.sorted)
-  }
-
-  @Test
-  def testPipelinedGetData(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 
2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
CreateMode.PERSISTENT))
-    val createResponses = createRequests.map(zookeeperClient.handleRequest)
-    createResponses.foreach(createResponse => assertEquals("Response code for 
create should be OK", Code.OK, createResponse.resultCode))
-    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x))
-    val getDataResponses = zookeeperClient.handleRequests(getDataRequests)
-    getDataResponses.foreach(getDataResponse => assertEquals("Response code 
for getData should be OK", Code.OK,
-      getDataResponse.resultCode))
-    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
-      assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
-      assertEquals("Data for getData should match", ((i + 1) * 2), 
Integer.valueOf(new String(getDataResponse.data)))
-    }
-  }
-
-  @Test
-  def testMixedPipeline(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    val getDataRequest = GetDataRequest(mockPath)
-    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1)
-    val responses = zookeeperClient.handleRequests(Seq(getDataRequest, 
setDataRequest))
-    assertEquals("Response code for getData should be OK", Code.OK, 
responses.head.resultCode)
-    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], 
responses.head.asInstanceOf[GetDataResponse].data)
-    assertEquals("Response code for setData should be NONODE", Code.NONODE, 
responses.last.resultCode)
-  }
-
-  @Test
-  def testZNodeChangeHandlerForCreation(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
-    val responses = zookeeperClient.handleRequests(Seq(existsRequest, 
createRequest))
-    assertEquals("Response code for exists should be NONODE", Code.NONODE, 
responses.head.resultCode)
-    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
-    assertTrue("Failed to receive create notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testZNodeChangeHandlerForDeletion(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleDeletion(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
-    val responses = zookeeperClient.handleRequests(Seq(createRequest, 
existsRequest))
-    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
-    assertEquals("Response code for exists should be OK", Code.OK, 
responses.head.resultCode)
-    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
-    assertEquals("Response code for delete should be OK", Code.OK, 
deleteResponse.resultCode)
-    assertTrue("Failed to receive delete notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testZNodeChangeHandlerForDataChange(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleDataChange(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
-    val responses = zookeeperClient.handleRequests(Seq(createRequest, 
existsRequest))
-    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
-    assertEquals("Response code for exists should be OK", Code.OK, 
responses.head.resultCode)
-    val setDataResponse = 
zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
-    assertEquals("Response code for setData should be OK", Code.OK, 
setDataResponse.resultCode)
-    assertTrue("Failed to receive data change notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testZNodeChildChangeHandlerForChildChange(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
-      override def handleChildChange(): Unit = {
-        zNodeChildChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    val child1 = "child1"
-    val child1Path = mockPath + "/" + child1
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
-    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
-    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
-    val createResponseChild1 = 
zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create child1 should be OK", Code.OK, 
createResponseChild1.resultCode)
-    assertTrue("Failed to receive child change notification", 
zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testStateChangeHandlerForAuthFailure(): Unit = {
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
"no-such-file-exists.conf")
-    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val stateChangeHandler = new StateChangeHandler {
-      override def onAuthFailure(): Unit = {
-        stateChangeHandlerCountDownLatch.countDown()
-      }
-    }
-    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
stateChangeHandler)
-    assertTrue("Failed to receive auth failed notification", 
stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  private def bytes = 
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 
b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index fb76ca1..cfc325d 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -86,7 +86,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with 
Logging {
     for(_ <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, 
"test".getBytes))
 
-    // update offset in zookeeper for consumer to jump "forward" in time
+    // update offset in ZooKeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
     val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, 
testConsumer)
     consumerProps.put("auto.offset.reset", resetTo)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index ca17b9a..22a06e7 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -239,7 +239,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
     
assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
 
-    //test removing last acl also deletes zookeeper path
+    //test removing last acl also deletes ZooKeeper path
     acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
     changeAclAndVerify(acls, Set.empty[Acl], acls)
     
assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
@@ -405,7 +405,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   def testHighConcurrencyDeletionOfResourceAcls() {
     val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), 
Allow, WildCardHost, All)
 
-    // Alternate authorizer to keep adding and removing zookeeper path
+    // Alternate authorizer to keep adding and removing ZooKeeper path
     val concurrentFuctions = (0 to 50).map { _ =>
       () => {
         simpleAclAuthorizer.addAcls(Set(acl), resource)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 4196bc1..31d32d2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -84,7 +84,7 @@ class ClientQuotaManagerTest {
 
   /**
    * Tests parsing for <client-id> quotas.
-   * Quota overrides persisted in Zookeeper in /config/clients/<client-id>, 
default persisted in /config/clients/<default>
+   * Quota overrides persisted in ZooKeeper in /config/clients/<client-id>, 
default persisted in /config/clients/<default>
    */
   @Test
   def testClientIdQuotaParsing() {
@@ -97,7 +97,7 @@ class ClientQuotaManagerTest {
 
   /**
    * Tests parsing for <user> quotas.
-   * Quota overrides persisted in Zookeeper in /config/users/<user>, default 
persisted in /config/users/<default>
+   * Quota overrides persisted in ZooKeeper in /config/users/<user>, default 
persisted in /config/users/<default>
    */
   @Test
   def testUserQuotaParsing() {
@@ -111,7 +111,7 @@ class ClientQuotaManagerTest {
 
   /**
    * Tests parsing for <user, client-id> quotas.
-   * Quotas persisted in Zookeeper in 
/config/users/<user>/clients/<client-id>, default in 
/config/users/<default>/clients/<default>
+   * Quotas persisted in ZooKeeper in 
/config/users/<user>/clients/<client-id>, default in 
/config/users/<default>/clients/<default>
    */
   @Test
   def testUserClientIdQuotaParsing() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a86c160..60f403d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package unit.kafka.server
+package kafka.server
 
 import java.lang.{Long => JLong}
 import java.net.InetAddress
@@ -30,7 +30,6 @@ import kafka.log.{Log, TimestampOffset}
 import kafka.network.RequestChannel
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server._
 import kafka.utils.{MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 8786b19..306dbc0 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -80,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     val logManager = server.getLogManager
@@ -115,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     val logManager = server.getLogManager
@@ -154,7 +154,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val topic = topicPartition.split("-").head
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, 
servers = Seq(server))
 
     var offsetChanged = false
@@ -178,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
@@ -207,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 9c51a10..fa373f5 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -139,7 +139,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     updateProducer()
 
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
-    assertTrue("Leader must remain on broker 1, in case of zookeeper session 
expiration it can move to broker 0",
+    assertTrue("Leader must remain on broker 1, in case of ZooKeeper session 
expiration it can move to broker 0",
       leader == 0 || leader == 1)
 
     assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
@@ -150,7 +150,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     server2.startup()
     updateProducer()
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, 
oldLeaderOpt = Some(leader))
-    assertTrue("Leader must remain on broker 0, in case of zookeeper session 
expiration it can move to broker 1",
+    assertTrue("Leader must remain on broker 0, in case of ZooKeeper session 
expiration it can move to broker 1",
       leader == 0 || leader == 1)
 
     sendMessages(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2920730..b8d0afb 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -74,7 +74,7 @@ object TestUtils extends Logging {
 
   /** Port to use for unit tests that mock/don't require a real ZK server. */
   val MockZkPort = 1
-  /** Zookeeper connection string to use for unit tests that mock/don't 
require a real ZK server. */
+  /** ZooKeeper connection string to use for unit tests that mock/don't 
require a real ZK server. */
   val MockZkConnect = "127.0.0.1:" + MockZkPort
 
   private val transactionStatusKey = "transactionStatus"
@@ -273,7 +273,7 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a topic in zookeeper.
+   * Create a topic in ZooKeeper.
    * Wait until the leader is elected and the metadata is propagated to all 
brokers.
    * Return the leader for each partition.
    */
@@ -293,7 +293,7 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a topic in zookeeper using a customized replica assignment.
+   * Create a topic in ZooKeeper using a customized replica assignment.
    * Wait until the leader is elected and the metadata is propagated to all 
brokers.
    * Return the leader for each partition.
    */
@@ -1172,7 +1172,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(() =>
       servers.forall(server => topicPartitions.forall(tp => 
server.replicaManager.getPartition(tp).isEmpty)),
       "Replica manager's should have deleted all of this topic's partitions")
-    // ensure that logs from all replicas are deleted if delete topic is 
marked successful in zookeeper
+    // ensure that logs from all replicas are deleted if delete topic is 
marked successful in ZooKeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.forall(server => topicPartitions.forall(tp => 
server.getLogManager.getLog(tp).isEmpty)))
     // ensure that topic is removed from all cleaner offsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala 
b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 22465ea..36d477c 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -25,19 +25,19 @@ import java.net.InetSocketAddress
 import kafka.utils.CoreUtils
 import org.apache.kafka.common.utils.Utils
 
-class EmbeddedZookeeper() {
+class EmbeddedZooKeeper() {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
   val tickTime = 500
-  val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
+  val zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val factory = new NIOServerCnxnFactory()
   private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
   factory.configure(addr, 0)
-  factory.startup(zookeeper)
-  val port = zookeeper.getClientPort()
+  factory.startup(zooKeeperServer)
+  val port = zooKeeperServer.getClientPort()
 
   def shutdown() {
-    CoreUtils.swallow(zookeeper.shutdown())
+    CoreUtils.swallow(zooKeeperServer.shutdown())
     CoreUtils.swallow(factory.shutdown())
 
     def isDown(): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala 
b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 07978b9..0cf836d 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -38,7 +38,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createPersistentPath(path)
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }
@@ -62,7 +62,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.makeSurePersistentPathExists(path)
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }
@@ -86,7 +86,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createEphemeralPathExpectConflict(path, "somedata")
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }
@@ -111,7 +111,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createSequentialPersistentPath(path)
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala 
b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 0a7e631..6bedba3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -41,14 +41,14 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with 
Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkUtils: ZkUtils = null
-  var zookeeper: EmbeddedZookeeper = null
+  var zookeeper: EmbeddedZooKeeper = null
 
   def zkPort: Int = zookeeper.port
   def zkConnect: String = s"127.0.0.1:$zkPort"
   
   @Before
   def setUp() {
-    zookeeper = new EmbeddedZookeeper()
+    zookeeper = new EmbeddedZooKeeper()
     zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
new file mode 100644
index 0000000..d595221
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zookeeper
+
+import java.net.UnknownHostException
+import java.nio.charset.StandardCharsets
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
+
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.{After, Test}
+
+class ZooKeeperClientTest extends ZooKeeperTestHarness {
+  private val mockPath = "/foo"
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    Configuration.setConfiguration(null)
+  }
+
+  @Test(expected = classOf[UnknownHostException])
+  def testUnresolvableConnectString(): Unit = {
+    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
+  }
+
+  @Test(expected = classOf[ZooKeeperClientTimeoutException])
+  def testConnectionTimeout(): Unit = {
+    zookeeper.shutdown()
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 
100, null)
+  }
+
+  @Test
+  def testConnection(): Unit = {
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+  }
+
+  @Test
+  def testDeleteNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
deleteResponse.resultCode)
+  }
+
+  @Test
+  def testDeleteExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
+    assertEquals("Response code for delete should be OK", Code.OK, 
deleteResponse.resultCode)
+  }
+
+  @Test
+  def testExistsNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
existsResponse.resultCode)
+  }
+
+  @Test
+  def testExistsExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
+    assertEquals("Response code for exists should be OK", Code.OK, 
existsResponse.resultCode)
+  }
+
+  @Test
+  def testGetDataNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val getDataResponse = 
zooKeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
getDataResponse.resultCode)
+  }
+
+  @Test
+  def testGetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
+      CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getDataResponse = 
zooKeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
+    assertArrayEquals("Data for getData should match created znode data", 
data, getDataResponse.data)
+  }
+
+  @Test
+  def testSetDataNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val setDataResponse = 
zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
setDataResponse.resultCode)
+  }
+
+  @Test
+  def testSetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val setDataResponse = 
zooKeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
+    assertEquals("Response code for setData should be OK", Code.OK, 
setDataResponse.resultCode)
+    val getDataResponse = 
zooKeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
+    assertArrayEquals("Data for getData should match setData's data", data, 
getDataResponse.data)
+  }
+
+  @Test
+  def testGetAclNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
getAclResponse.resultCode)
+  }
+
+  @Test
+  def testGetAclExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
+    assertEquals("Response code for getAcl should be OK", Code.OK, 
getAclResponse.resultCode)
+    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl)
+  }
+
+  @Test
+  def testSetAclNonExistentZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
setAclResponse.resultCode)
+  }
+
+  @Test
+  def testGetChildrenNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val getChildrenResponse = 
zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
getChildrenResponse.resultCode)
+  }
+
+  @Test
+  def testGetChildrenExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getChildrenResponse = 
zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
+    assertEquals("getChildren should return no children", Seq.empty[String], 
getChildrenResponse.children)
+  }
+
+  @Test
+  def testGetChildrenExistingZNodeWithChildren(): Unit = {
+    import scala.collection.JavaConverters._
+    val child1 = "child1"
+    val child2 = "child2"
+    val child1Path = mockPath + "/" + child1
+    val child2Path = mockPath + "/" + child2
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val createResponseChild1 = 
zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child1 should be OK", Code.OK, 
createResponseChild1.resultCode)
+    val createResponseChild2 = 
zooKeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child2 should be OK", Code.OK, 
createResponseChild2.resultCode)
+
+    val getChildrenResponse = 
zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
+    assertEquals("getChildren should return two children", Seq(child1, 
child2), getChildrenResponse.children.sorted)
+  }
+
+  @Test
+  def testPipelinedGetData(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 
2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
CreateMode.PERSISTENT))
+    val createResponses = createRequests.map(zooKeeperClient.handleRequest)
+    createResponses.foreach(createResponse => assertEquals("Response code for 
create should be OK", Code.OK, createResponse.resultCode))
+    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x))
+    val getDataResponses = zooKeeperClient.handleRequests(getDataRequests)
+    getDataResponses.foreach(getDataResponse => assertEquals("Response code 
for getData should be OK", Code.OK,
+      getDataResponse.resultCode))
+    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
+      assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
+      assertEquals("Data for getData should match", ((i + 1) * 2), 
Integer.valueOf(new String(getDataResponse.data)))
+    }
+  }
+
+  @Test
+  def testMixedPipeline(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getDataRequest = GetDataRequest(mockPath)
+    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1)
+    val responses = zooKeeperClient.handleRequests(Seq(getDataRequest, 
setDataRequest))
+    assertEquals("Response code for getData should be OK", Code.OK, 
responses.head.resultCode)
+    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], 
responses.head.asInstanceOf[GetDataResponse].data)
+    assertEquals("Response code for setData should be NONODE", Code.NONODE, 
responses.last.resultCode)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForCreation(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zooKeeperClient.handleRequests(Seq(existsRequest, 
createRequest))
+    assertEquals("Response code for exists should be NONODE", Code.NONODE, 
responses.head.resultCode)
+    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
+    assertTrue("Failed to receive create notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDeletion(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleDeletion(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zooKeeperClient.handleRequests(Seq(createRequest, 
existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
+    assertEquals("Response code for exists should be OK", Code.OK, 
responses.head.resultCode)
+    val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
+    assertEquals("Response code for delete should be OK", Code.OK, 
deleteResponse.resultCode)
+    assertTrue("Failed to receive delete notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleDataChange(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zooKeeperClient.handleRequests(Seq(createRequest, 
existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
+    assertEquals("Response code for exists should be OK", Code.OK, 
responses.head.resultCode)
+    val setDataResponse = 
zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
+    assertEquals("Response code for setData should be OK", Code.OK, 
setDataResponse.resultCode)
+    assertTrue("Failed to receive data change notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChildChangeHandlerForChildChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
+      override def handleChildChange(): Unit = {
+        zNodeChildChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val child1 = "child1"
+    val child1Path = mockPath + "/" + child1
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val getChildrenResponse = 
zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
+    val createResponseChild1 = 
zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child1 should be OK", Code.OK, 
createResponseChild1.resultCode)
+    assertTrue("Failed to receive child change notification", 
zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testStateChangeHandlerForAuthFailure(): Unit = {
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
"no-such-file-exists.conf")
+    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override def onAuthFailure(): Unit = {
+        stateChangeHandlerCountDownLatch.countDown()
+      }
+    }
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
stateChangeHandler)
+    assertTrue("Failed to receive auth failed notification", 
stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  private def bytes = 
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 367e489..0145827 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -20,7 +20,7 @@ import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
+import kafka.zk.EmbeddedZooKeeper;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
@@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random 
port being selected
     private static final int TOPIC_CREATION_TIMEOUT = 30000;
     private static final int TOPIC_DELETION_TIMEOUT = 30000;
-    private EmbeddedZookeeper zookeeper = null;
+    private EmbeddedZooKeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
     private ZkUtils zkUtils = null;
 
@@ -84,7 +84,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     public void start() throws IOException, InterruptedException {
         log.debug("Initiating embedded Kafka cluster startup");
         log.debug("Starting a ZooKeeper instance");
-        zookeeper = new EmbeddedZookeeper();
+        zookeeper = new EmbeddedZooKeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
         zkUtils = ZkUtils.apply(

Reply via email to