Repository: kafka
Updated Branches:
  refs/heads/trunk efefb452d -> b1cd6c530


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 16e1486..1c87b5e 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -18,6 +18,7 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.{StateChangeFailedException, TopicAndPartition}
+import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.zookeeper.KeeperException
@@ -145,7 +146,7 @@ class PartitionStateMachine(config: KafkaConfig,
    * @param targetState The end state that the partition should be moved to
    */
   private def doHandleStateChanges(partitions: Seq[TopicAndPartition], 
targetState: PartitionState,
-                           partitionLeaderElectionStrategyOpt: 
Option[PartitionLeaderElectionStrategy] = None): Unit = {
+                           partitionLeaderElectionStrategyOpt: 
Option[PartitionLeaderElectionStrategy]): Unit = {
     val stateChangeLog = 
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
     partitions.foreach(partition => partitionState.getOrElseUpdate(partition, 
NonExistentPartition))
     val (validPartitions, invalidPartitions) = partitions.partition(partition 
=> isValidTransition(partition, targetState))
@@ -223,8 +224,8 @@ class PartitionStateMachine(config: KafkaConfig,
         Seq.empty
     }
     createResponses.foreach { createResponse =>
-      val code = Code.get(createResponse.rc)
-      val partition = createResponse.ctx.asInstanceOf[TopicAndPartition]
+      val code = createResponse.resultCode
+      val partition = createResponse.ctx.get.asInstanceOf[TopicAndPartition]
       val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
       if (code == Code.OK) {
         controllerContext.partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
@@ -275,9 +276,7 @@ class PartitionStateMachine(config: KafkaConfig,
    *         3. Exceptions corresponding to failed elections that should not 
be retried.
    */
   private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], 
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
-  (Seq[TopicAndPartition],
-    Seq[TopicAndPartition],
-    Map[TopicAndPartition, Exception]) = {
+  (Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, 
Exception]) = {
     val getDataResponses = try {
       zkUtils.getTopicPartitionStatesRaw(partitions)
     } catch {
@@ -287,20 +286,20 @@ class PartitionStateMachine(config: KafkaConfig,
     val failedElections = mutable.Map.empty[TopicAndPartition, Exception]
     val leaderIsrAndControllerEpochPerPartition = 
mutable.Buffer.empty[(TopicAndPartition, LeaderIsrAndControllerEpoch)]
     getDataResponses.foreach { getDataResponse =>
-      val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition]
+      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
       val currState = partitionState(partition)
-      if (Code.get(getDataResponse.rc) == Code.OK) {
+      if (getDataResponse.resultCode == Code.OK) {
         val leaderIsrAndControllerEpochOpt = 
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
         if (leaderIsrAndControllerEpochOpt.isEmpty) {
           val exception = new StateChangeFailedException(s"LeaderAndIsr 
information doesn't exist for partition $partition in $currState state")
           failedElections.put(partition, exception)
         }
         leaderIsrAndControllerEpochPerPartition += partition -> 
leaderIsrAndControllerEpochOpt.get
-      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+      } else if (getDataResponse.resultCode == Code.NONODE) {
         val exception = new StateChangeFailedException(s"LeaderAndIsr 
information doesn't exist for partition $partition in $currState state")
         failedElections.put(partition, exception)
       } else {
-        failedElections.put(partition, 
KeeperException.create(Code.get(getDataResponse.rc)))
+        failedElections.put(partition, getDataResponse.resultException.get)
       }
     }
     val (invalidPartitionsForElection, validPartitionsForElection) = 
leaderIsrAndControllerEpochPerPartition.partition { case (partition, 
leaderIsrAndControllerEpoch) =>
@@ -332,7 +331,8 @@ class PartitionStateMachine(config: KafkaConfig,
     }
     val recipientsPerPartition = partitionsWithLeaders.map { case (partition, 
leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
     val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, 
leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
-    val (successfulUpdates, updatesToRetry, failedUpdates) = 
zkUtils.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch)
+    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, 
failedUpdates) = zkUtils.updateLeaderAndIsr(
+      adjustedLeaderAndIsrs, controllerContext.epoch)
     successfulUpdates.foreach { case (partition, leaderAndIsr) =>
       val replicas = controllerContext.partitionReplicaAssignment(partition)
       val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index f811612..4da1c7b 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -19,9 +19,9 @@ package kafka.controller
 import kafka.api.LeaderAndIsr
 import kafka.common.{StateChangeFailedException, TopicAndPartition}
 import kafka.controller.Callbacks.CallbackBuilder
+import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
-import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.mutable
@@ -292,7 +292,8 @@ class ReplicaStateMachine(config: KafkaConfig,
       val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else 
leaderAndIsr.isr.filter(_ != replicaId)
       leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
     }
-    val (successfulUpdates, updatesToRetry, failedUpdates) = 
zkUtils.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch)
+    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, 
failedUpdates) = zkUtils.updateLeaderAndIsr(
+      adjustedLeaderAndIsrs, controllerContext.epoch)
     val exceptionsForPartitionsWithNoLeaderAndIsrInZk = 
partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
       if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
         val exception = new StateChangeFailedException(s"Failed to change 
state of replica $replicaId for partition $partition since the leader and isr 
path in zookeeper is empty")
@@ -331,8 +332,8 @@ class ReplicaStateMachine(config: KafkaConfig,
         return (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, 
failed.toMap)
     }
     getDataResponses.foreach { getDataResponse =>
-      val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition]
-      if (Code.get(getDataResponse.rc) == Code.OK) {
+      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      if (getDataResponse.resultCode == Code.OK) {
         val leaderIsrAndControllerEpochOpt = 
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
         if (leaderIsrAndControllerEpochOpt.isEmpty) {
           partitionsWithNoLeaderAndIsrInZk += partition
@@ -347,10 +348,10 @@ class ReplicaStateMachine(config: KafkaConfig,
             leaderAndIsrs.put(partition, 
leaderIsrAndControllerEpoch.leaderAndIsr)
           }
         }
-      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+      } else if (getDataResponse.resultCode == Code.NONODE) {
         partitionsWithNoLeaderAndIsrInZk += partition
       } else {
-        failed.put(partition, 
KeeperException.create(Code.get(getDataResponse.rc)))
+        failed.put(partition, getDataResponse.resultException.get)
       }
     }
     (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala 
b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
index e68c738..0009439 100644
--- a/core/src/main/scala/kafka/controller/ZookeeperClient.scala
+++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
@@ -23,10 +23,13 @@ import java.util.concurrent.{ArrayBlockingQueue, 
ConcurrentHashMap, CountDownLat
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.Logging
 import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, 
DataCallback, StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
 import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper}
+import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, 
Watcher, ZooKeeper}
+
+import scala.collection.JavaConverters._
 
 /**
  * ZookeeperClient is a zookeeper client that encourages pipelined requests to 
zookeeper.
@@ -36,88 +39,106 @@ import org.apache.zookeeper.{CreateMode, WatchedEvent, 
Watcher, ZooKeeper}
  * @param connectionTimeoutMs connection timeout in milliseconds
  * @param stateChangeHandler state change handler callbacks called by the 
underlying zookeeper client's EventThread.
  */
-class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, 
connectionTimeoutMs: Int, stateChangeHandler: StateChangeHandler) extends 
Logging {
-  this.logIdent = "[ZookeeperClient]: "
+class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, 
connectionTimeoutMs: Int,
+                      stateChangeHandler: StateChangeHandler) extends Logging {
+  this.logIdent = "[ZookeeperClient] "
   private val initializationLock = new ReentrantReadWriteLock()
   private val isConnectedOrExpiredLock = new ReentrantLock()
   private val isConnectedOrExpiredCondition = 
isConnectedOrExpiredLock.newCondition()
-  private val zNodeChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChangeHandler]()
-  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]()
+  private val zNodeChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChangeHandler]().asScala
+  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
 
   info(s"Initializing a new session to $connectString.")
   @volatile private var zooKeeper = new ZooKeeper(connectString, 
sessionTimeoutMs, ZookeeperClientWatcher)
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
   /**
-   * Take an AsyncRequest and wait for its AsyncResponse. See 
handle(Seq[AsyncRequest]) for details.
+   * Send a request and wait for its response. See handle(Seq[AsyncRequest]) 
for details.
    *
-   * @param request a single AsyncRequest to wait on.
-   * @return the request's AsyncResponse.
+   * @param request a single request to send and wait on.
+   * @return an instance of the response with the specific type (e.g. 
CreateRequest -> CreateResponse).
    */
-  def handle(request: AsyncRequest): AsyncResponse = {
-    handle(Seq(request)).head
+  def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = {
+    handleRequests(Seq(request)).head
   }
 
   /**
-   * Pipeline a sequence of AsyncRequests and wait for all of their 
AsyncResponses.
+   * Send a pipelined sequence of requests and wait for all of their responses.
    *
    * The watch flag on each outgoing request will be set if we've already 
registered a handler for the
-   * path associated with the AsyncRequest.
+   * path associated with the request.
    *
-   * @param requests a sequence of AsyncRequests to wait on.
-   * @return the AsyncResponses.
+   * @param requests a sequence of requests to send and wait on.
+   * @return the responses for the requests. If all requests have the same 
type, the responses will have the respective
+   * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). 
Otherwise, the most specific common supertype
+   * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
    */
-  def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = 
inReadLock(initializationLock) {
-    import scala.collection.JavaConverters._
-    if (requests.isEmpty) {
-      return Seq.empty
-    }
-    val countDownLatch = new CountDownLatch(requests.size)
-    val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size)
-    requests.foreach {
-      case CreateRequest(path, data, acl, createMode, ctx) => 
zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
-        override def processResult(rc: Int, path: String, ctx: Any, name: 
String) = {
-          responseQueue.add(CreateResponse(rc, path, ctx, name))
-          countDownLatch.countDown()
-        }}, ctx)
-      case DeleteRequest(path, version, ctx) => zooKeeper.delete(path, 
version, new VoidCallback {
-        override def processResult(rc: Int, path: String, ctx: Any) = {
-          responseQueue.add(DeleteResponse(rc, path, ctx))
-          countDownLatch.countDown()
-        }}, ctx)
-      case ExistsRequest(path, ctx) => zooKeeper.exists(path, 
zNodeChangeHandlers.containsKey(path), new StatCallback {
-        override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat) = {
-          responseQueue.add(ExistsResponse(rc, path, ctx, stat))
-          countDownLatch.countDown()
-        }}, ctx)
-      case GetDataRequest(path, ctx) => zooKeeper.getData(path, 
zNodeChangeHandlers.containsKey(path), new DataCallback {
-        override def processResult(rc: Int, path: String, ctx: Any, data: 
Array[Byte], stat: Stat) = {
-          responseQueue.add(GetDataResponse(rc, path, ctx, data, stat))
-          countDownLatch.countDown()
-        }}, ctx)
-      case SetDataRequest(path, data, version, ctx) => zooKeeper.setData(path, 
data, version, new StatCallback {
-        override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat) = {
-          responseQueue.add(SetDataResponse(rc, path, ctx, stat))
-          countDownLatch.countDown()
-        }}, ctx)
-      case GetACLRequest(path, ctx) => zooKeeper.getACL(path, null, new 
ACLCallback {
-        override def processResult(rc: Int, path: String, ctx: Any, acl: 
java.util.List[ACL], stat: Stat): Unit = {
-          responseQueue.add(GetACLResponse(rc, path, ctx, 
Option(acl).map(_.asScala).orNull, stat))
-          countDownLatch.countDown()
-        }}, ctx)
-      case SetACLRequest(path, acl, version, ctx) => zooKeeper.setACL(path, 
acl.asJava, version, new StatCallback {
-        override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat) = {
-          responseQueue.add(SetACLResponse(rc, path, ctx, stat))
-          countDownLatch.countDown()
-        }}, ctx)
-      case GetChildrenRequest(path, ctx) => zooKeeper.getChildren(path, 
zNodeChildChangeHandlers.containsKey(path), new Children2Callback {
-        override def processResult(rc: Int, path: String, ctx: Any, children: 
java.util.List[String], stat: Stat) = {
-          responseQueue.add(GetChildrenResponse(rc, path, ctx, 
Option(children).map(_.asScala).orNull, stat))
+  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): 
Seq[Req#Response] = inReadLock(initializationLock) {
+    if (requests.isEmpty)
+      Seq.empty
+    else {
+      val countDownLatch = new CountDownLatch(requests.size)
+      val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
+
+      requests.foreach { request =>
+        send(request) { response =>
+          responseQueue.add(response)
           countDownLatch.countDown()
-        }}, ctx)
+        }
+      }
+      countDownLatch.await()
+      responseQueue.asScala.toBuffer
+    }
+  }
+
+  private def send[Req <: AsyncRequest](request: Req)(processResponse: 
Req#Response => Unit): Unit = {
+    // Safe to cast as we always create a response of the right type
+    def callback(response: AsyncResponse): Unit = 
processResponse(response.asInstanceOf[Req#Response])
+
+    request match {
+      case ExistsRequest(path, ctx) =>
+        zooKeeper.exists(path, shouldWatch(request), new StatCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat): Unit =
+            callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat))
+        }, ctx.orNull)
+      case GetDataRequest(path, ctx) =>
+        zooKeeper.getData(path, shouldWatch(request), new DataCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, data: 
Array[Byte], stat: Stat): Unit =
+            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, 
stat))
+        }, ctx.orNull)
+      case GetChildrenRequest(path, ctx) =>
+        zooKeeper.getChildren(path, shouldWatch(request), new 
Children2Callback {
+          override def processResult(rc: Int, path: String, ctx: Any, 
children: java.util.List[String], stat: Stat): Unit =
+            callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
+              Option(children).map(_.asScala).getOrElse(Seq.empty), stat))
+        }, ctx.orNull)
+      case CreateRequest(path, data, acl, createMode, ctx) =>
+        zooKeeper.create(path, data, acl.asJava, createMode, new 
StringCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, name: 
String): Unit =
+            callback(CreateResponse(Code.get(rc), path, Option(ctx), name))
+        }, ctx.orNull)
+      case SetDataRequest(path, data, version, ctx) =>
+        zooKeeper.setData(path, data, version, new StatCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat): Unit =
+            callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat))
+        }, ctx.orNull)
+      case DeleteRequest(path, version, ctx) =>
+        zooKeeper.delete(path, version, new VoidCallback {
+          override def processResult(rc: Int, path: String, ctx: Any): Unit =
+            callback(DeleteResponse(Code.get(rc), path, Option(ctx)))
+        }, ctx.orNull)
+      case GetAclRequest(path, ctx) =>
+        zooKeeper.getACL(path, null, new ACLCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, acl: 
java.util.List[ACL], stat: Stat): Unit = {
+            callback(GetAclResponse(Code.get(rc), path, Option(ctx), 
Option(acl).map(_.asScala).getOrElse(Seq.empty),
+              stat))
+        }}, ctx.orNull)
+      case SetAclRequest(path, acl, version, ctx) =>
+        zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat): Unit =
+            callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat))
+        }, ctx.orNull)
     }
-    countDownLatch.await()
-    responseQueue.asScala.toSeq
   }
 
   /**
@@ -150,6 +171,14 @@ class ZookeeperClient(connectString: String, 
sessionTimeoutMs: Int, connectionTi
     info("Connected.")
   }
 
+  // If this method is changed, the documentation for 
registerZNodeChangeHandler and/or registerZNodeChildChangeHandler
+  // may need to be updated.
+  private def shouldWatch(request: AsyncRequest): Boolean = request match {
+    case _: GetChildrenRequest => 
zNodeChildChangeHandlers.contains(request.path)
+    case _: ExistsRequest | _: GetDataRequest => 
zNodeChangeHandlers.contains(request.path)
+    case _ => throw new IllegalArgumentException(s"Request $request is not 
watchable")
+  }
+
   /**
    * Register the handler to ZookeeperClient. This is just a local operation. 
This does not actually register a watcher.
    *
@@ -199,7 +228,7 @@ class ZookeeperClient(connectString: String, 
sessionTimeoutMs: Int, connectionTi
     info("Closed.")
   }
 
-  def sessionId = inReadLock(initializationLock) {
+  def sessionId: Long = inReadLock(initializationLock) {
     zooKeeper.getSessionId
   }
 
@@ -231,29 +260,29 @@ class ZookeeperClient(connectString: String, 
sessionTimeoutMs: Int, connectionTi
   private object ZookeeperClientWatcher extends Watcher {
     override def process(event: WatchedEvent): Unit = {
       debug("Received event: " + event)
-      if (event.getPath == null) {
-        inLock(isConnectedOrExpiredLock) {
-          isConnectedOrExpiredCondition.signalAll()
-        }
-        if (event.getState == KeeperState.AuthFailed) {
-          info("Auth failed.")
-          stateChangeHandler.onAuthFailure()
-        } else if (event.getState == KeeperState.Expired) {
-          inWriteLock(initializationLock) {
-            info("Session expired.")
-            stateChangeHandler.beforeInitializingSession()
-            initialize()
-            stateChangeHandler.afterInitializingSession()
+      Option(event.getPath) match {
+        case None =>
+          inLock(isConnectedOrExpiredLock) {
+            isConnectedOrExpiredCondition.signalAll()
+          }
+          if (event.getState == KeeperState.AuthFailed) {
+            info("Auth failed.")
+            stateChangeHandler.onAuthFailure()
+          } else if (event.getState == KeeperState.Expired) {
+            inWriteLock(initializationLock) {
+              info("Session expired.")
+              stateChangeHandler.beforeInitializingSession()
+              initialize()
+              stateChangeHandler.afterInitializingSession()
+            }
+          }
+        case Some(path) =>
+          (event.getType: @unchecked) match {
+            case EventType.NodeChildrenChanged => 
zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
+            case EventType.NodeCreated => 
zNodeChangeHandlers.get(path).foreach(_.handleCreation())
+            case EventType.NodeDeleted => 
zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
+            case EventType.NodeDataChanged => 
zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
           }
-        }
-      } else if (event.getType == EventType.NodeCreated) {
-        
Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation())
-      } else if (event.getType == EventType.NodeDeleted) {
-        
Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion())
-      } else if (event.getType == EventType.NodeDataChanged) {
-        
Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange())
-      } else if (event.getType == EventType.NodeChildrenChanged) {
-        
Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange())
       }
     }
   }
@@ -279,33 +308,67 @@ trait ZNodeChildChangeHandler {
 }
 
 sealed trait AsyncRequest {
-  val path: String
-  val ctx: Any
+  /**
+   * This type member allows us to define methods that take requests and 
return responses with the correct types.
+   * See ``ZookeeperClient.handleRequests`` for example.
+   */
+  type Response <: AsyncResponse
+  def path: String
+  def ctx: Option[Any]
+}
+
+case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], 
createMode: CreateMode,
+                         ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = CreateResponse
+}
+
+case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) 
extends AsyncRequest {
+  type Response = DeleteResponse
+}
+
+case class ExistsRequest(path: String, ctx: Option[Any] = None) extends 
AsyncRequest {
+  type Response = ExistsResponse
+}
+
+case class GetDataRequest(path: String, ctx: Option[Any] = None) extends 
AsyncRequest {
+  type Response = GetDataResponse
+}
+
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: 
Option[Any] = None) extends AsyncRequest {
+  type Response = SetDataResponse
+}
+
+case class GetAclRequest(path: String, ctx: Option[Any] = None) extends 
AsyncRequest {
+  type Response = GetAclResponse
+}
+
+case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: 
Option[Any] = None) extends AsyncRequest {
+  type Response = SetAclResponse
+}
+
+case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends 
AsyncRequest {
+  type Response = GetChildrenResponse
 }
-case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], 
createMode: CreateMode, ctx: Any) extends AsyncRequest
-case class DeleteRequest(path: String, version: Int, ctx: Any) extends 
AsyncRequest
-case class ExistsRequest(path: String, ctx: Any) extends AsyncRequest
-case class GetDataRequest(path: String, ctx: Any) extends AsyncRequest
-case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: 
Any) extends AsyncRequest
-case class GetACLRequest(path: String, ctx: Any) extends AsyncRequest
-case class SetACLRequest(path: String, acl: Seq[ACL], version: Int, ctx: Any) 
extends AsyncRequest
-case class GetChildrenRequest(path: String, ctx: Any) extends AsyncRequest
 
 sealed trait AsyncResponse {
-  val rc: Int
-  val path: String
-  val ctx: Any
+  def resultCode: Code
+  def path: String
+  def ctx: Option[Any]
+
+  /** Return None if the result code is OK and KeeperException otherwise. */
+  def resultException: Option[KeeperException] =
+    if (resultCode == Code.OK) None else 
Some(KeeperException.create(resultCode, path))
 }
-case class CreateResponse(rc: Int, path: String, ctx: Any, name: String) 
extends AsyncResponse
-case class DeleteResponse(rc: Int, path: String, ctx: Any) extends 
AsyncResponse
-case class ExistsResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends 
AsyncResponse
-case class GetDataResponse(rc: Int, path: String, ctx: Any, data: Array[Byte], 
stat: Stat) extends AsyncResponse
-case class SetDataResponse(rc: Int, path: String, ctx: Any, stat: Stat) 
extends AsyncResponse
-case class GetACLResponse(rc: Int, path: String, ctx: Any, acl: Seq[ACL], 
stat: Stat) extends AsyncResponse
-case class SetACLResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends 
AsyncResponse
-case class GetChildrenResponse(rc: Int, path: String, ctx: Any, children: 
Seq[String], stat: Stat) extends AsyncResponse
+case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], 
name: String) extends AsyncResponse
+case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) 
extends AsyncResponse
+case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], 
stat: Stat) extends AsyncResponse
+case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], 
data: Array[Byte], stat: Stat) extends AsyncResponse
+case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], 
stat: Stat) extends AsyncResponse
+case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], 
acl: Seq[ACL], stat: Stat) extends AsyncResponse
+case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], 
stat: Stat) extends AsyncResponse
+case class GetChildrenResponse(resultCode: Code, path: String, ctx: 
Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse
 
 class ZookeeperClientException(message: String) extends 
RuntimeException(message)
 class ZookeeperClientExpiredException(message: String) extends 
ZookeeperClientException(message)
 class ZookeeperClientAuthFailedException(message: String) extends 
ZookeeperClientException(message)
-class ZookeeperClientTimeoutException(message: String) extends 
ZookeeperClientException(message)
\ No newline at end of file
+class ZookeeperClientTimeoutException(message: String) extends 
ZookeeperClientException(message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 38e0b66..cc38667 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -24,7 +24,7 @@ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster._
 import kafka.common.{KafkaException, NoEpochForPartitionException, 
TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
-import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, 
ReassignedPartitionsContext}
+import kafka.controller.{LeaderIsrAndControllerEpoch, 
ReassignedPartitionsContext}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ConfigType
 import kafka.utils.ZkUtils._

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 1214344..296f4a7 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -18,6 +18,7 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
+import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -82,7 +83,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> 
leaderIsrAndControllerEpoch)))
-      .andReturn(Seq(CreateResponse(Code.OK.intValue(), null, partition, 
null)))
+      .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, 
Seq(brokerId), isNew = true))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -116,7 +117,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> 
leaderIsrAndControllerEpoch)))
-      .andReturn(Seq(CreateResponse(Code.NODEEXISTS.intValue(), null, 
partition, null)))
+      .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), 
null)))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Option(OfflinePartitionLeaderElectionStrategy))
@@ -150,13 +151,13 @@ class PartitionStateMachineTest extends JUnitSuite {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+      .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
-      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
       Seq(brokerId), isNew = false))
@@ -182,13 +183,13 @@ class PartitionStateMachineTest extends JUnitSuite {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+      .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     val leaderAndIsrAfterElection = 
leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
-      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition.topic, partition.partition, 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
       Seq(brokerId, otherBrokerId), isNew = false))
@@ -233,14 +234,15 @@ class PartitionStateMachineTest extends JUnitSuite {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, 
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+      .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals()))
       .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch))
-      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), 
Seq(brokerId), isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -284,7 +286,8 @@ class PartitionStateMachineTest extends JUnitSuite {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andReturn(Seq(GetDataResponse(Code.NONODE.intValue(), null, partition, 
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+      .andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition),
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 62c28a0..0afe7c2 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -18,6 +18,7 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
+import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.zookeeper.KeeperException.Code
@@ -178,10 +179,10 @@ class ReplicaStateMachineTest extends JUnitSuite {
     val updatedLeaderAndIsr = 
adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
     val updatedLeaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
     EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+      .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
     EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> 
adjustedLeaderAndIsr), controllerEpoch))
-      .andReturn(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
     
EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition.topic, partition.partition, 
updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
index 9f172f0..d7b46c7 100644
--- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
@@ -58,42 +58,42 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testDeleteNonExistentZNode(): Unit = {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, 
null)).asInstanceOf[DeleteResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(deleteResponse.rc))
+    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
deleteResponse.resultCode)
   }
 
   @Test
   def testDeleteExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, 
null)).asInstanceOf[DeleteResponse]
-    assertEquals("Response code for delete should be OK", Code.OK, 
Code.get(deleteResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
+    assertEquals("Response code for delete should be OK", Code.OK, 
deleteResponse.resultCode)
   }
 
   @Test
   def testExistsNonExistentZNode(): Unit = {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, 
null)).asInstanceOf[ExistsResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(existsResponse.rc))
+    val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
existsResponse.resultCode)
   }
 
   @Test
   def testExistsExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, 
null)).asInstanceOf[ExistsResponse]
-    assertEquals("Response code for exists should be OK", Code.OK, 
Code.get(existsResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
+    assertEquals("Response code for exists should be OK", Code.OK, 
existsResponse.resultCode)
   }
 
   @Test
   def testGetDataNonExistentZNode(): Unit = {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, 
null)).asInstanceOf[GetDataResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(getDataResponse.rc))
+    val getDataResponse = 
zookeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
getDataResponse.resultCode)
   }
 
   @Test
@@ -101,18 +101,19 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     import scala.collection.JavaConverters._
     val data = bytes
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, 
null)).asInstanceOf[GetDataResponse]
-    assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(getDataResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
+      CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getDataResponse = 
zookeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
     assertArrayEquals("Data for getData should match created znode data", 
data, getDataResponse.data)
   }
 
   @Test
   def testSetDataNonExistentZNode(): Unit = {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, 
Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(setDataResponse.rc))
+    val setDataResponse = 
zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
setDataResponse.resultCode)
   }
 
   @Test
@@ -120,56 +121,58 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     import scala.collection.JavaConverters._
     val data = bytes
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, 
data, -1, null)).asInstanceOf[SetDataResponse]
-    assertEquals("Response code for setData should be OK", Code.OK, 
Code.get(setDataResponse.rc))
-    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, 
null)).asInstanceOf[GetDataResponse]
-    assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(getDataResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val setDataResponse = 
zookeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
+    assertEquals("Response code for setData should be OK", Code.OK, 
setDataResponse.resultCode)
+    val getDataResponse = 
zookeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
     assertArrayEquals("Data for getData should match setData's data", data, 
getDataResponse.data)
   }
 
   @Test
-  def testGetACLNonExistentZNode(): Unit = {
+  def testGetAclNonExistentZNode(): Unit = {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, 
null)).asInstanceOf[GetACLResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(getACLResponse.rc))
+    val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
getAclResponse.resultCode)
   }
 
   @Test
-  def testGetACLExistingZNode(): Unit = {
+  def testGetAclExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, 
null)).asInstanceOf[GetACLResponse]
-    assertEquals("Response code for getACL should be OK", Code.OK, 
Code.get(getACLResponse.rc))
-    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getACLResponse.acl)
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
+    assertEquals("Response code for getAcl should be OK", Code.OK, 
getAclResponse.resultCode)
+    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl)
   }
 
   @Test
-  def testSetACLNonExistentZNode(): Unit = {
+  def testSetAclNonExistentZNode(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val setACLResponse = zookeeperClient.handle(SetACLRequest(mockPath, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1, null)).asInstanceOf[SetACLResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(setACLResponse.rc))
+    val setAclResponse = zookeeperClient.handleRequest(SetAclRequest(mockPath, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
setAclResponse.resultCode)
   }
 
   @Test
   def testGetChildrenNonExistentZNode(): Unit = {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
-    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(getChildrenResponse.rc))
+    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, 
getChildrenResponse.resultCode)
   }
 
   @Test
   def testGetChildrenExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
-    assertEquals("Response code for getChildren should be OK", Code.OK, 
Code.get(getChildrenResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
     assertEquals("getChildren should return no children", Seq.empty[String], 
getChildrenResponse.children)
   }
 
@@ -181,15 +184,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val child1Path = mockPath + "/" + child1
     val child2Path = mockPath + "/" + child2
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val createResponseChild1 = 
zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create child1 should be OK", Code.OK, 
Code.get(createResponseChild1.rc))
-    val createResponseChild2 = 
zookeeperClient.handle(CreateRequest(child2Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create child2 should be OK", Code.OK, 
Code.get(createResponseChild2.rc))
-
-    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
-    assertEquals("Response code for getChildren should be OK", Code.OK, 
Code.get(getChildrenResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val createResponseChild1 = 
zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child1 should be OK", Code.OK, 
createResponseChild1.resultCode)
+    val createResponseChild2 = 
zookeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child2 should be OK", Code.OK, 
createResponseChild2.resultCode)
+
+    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
     assertEquals("getChildren should return two children", Seq(child1, 
child2), getChildrenResponse.children.sorted)
   }
 
@@ -197,15 +203,16 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
   def testPipelinedGetData(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 
2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
CreateMode.PERSISTENT, null))
-    val createResponses = createRequests.map(zookeeperClient.handle)
-    createResponses.foreach(createResponse => assertEquals("Response code for 
create should be OK", Code.OK, Code.get(createResponse.rc)))
-    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x, null))
-    val getDataResponses = zookeeperClient.handle(getDataRequests)
-    getDataResponses.foreach(getDataResponse => assertEquals("Response code 
for getData should be OK", Code.OK, Code.get(getDataResponse.rc)))
+    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 
2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
CreateMode.PERSISTENT))
+    val createResponses = createRequests.map(zookeeperClient.handleRequest)
+    createResponses.foreach(createResponse => assertEquals("Response code for 
create should be OK", Code.OK, createResponse.resultCode))
+    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x))
+    val getDataResponses = zookeeperClient.handleRequests(getDataRequests)
+    getDataResponses.foreach(getDataResponse => assertEquals("Response code 
for getData should be OK", Code.OK,
+      getDataResponse.resultCode))
     getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
-      assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(getDataResponse.rc))
-      assertEquals("Data for getData should match", ((i + 1) * 2), 
Integer.valueOf(new String(getDataResponse.asInstanceOf[GetDataResponse].data)))
+      assertEquals("Response code for getData should be OK", Code.OK, 
getDataResponse.resultCode)
+      assertEquals("Data for getData should match", ((i + 1) * 2), 
Integer.valueOf(new String(getDataResponse.data)))
     }
   }
 
@@ -213,14 +220,15 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
   def testMixedPipeline(): Unit = {
     import scala.collection.JavaConverters._
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
-    val getDataRequest = GetDataRequest(mockPath, null)
-    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1, 
null)
-    val responses = zookeeperClient.handle(Seq(getDataRequest, setDataRequest))
-    assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(responses.head.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+    val getDataRequest = GetDataRequest(mockPath)
+    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1)
+    val responses = zookeeperClient.handleRequests(Seq(getDataRequest, 
setDataRequest))
+    assertEquals("Response code for getData should be OK", Code.OK, 
responses.head.resultCode)
     assertArrayEquals("Data for getData should be empty", Array.empty[Byte], 
responses.head.asInstanceOf[GetDataResponse].data)
-    assertEquals("Response code for setData should be NONODE", Code.NONODE, 
Code.get(responses.last.rc))
+    assertEquals("Response code for setData should be NONODE", Code.NONODE, 
responses.last.resultCode)
   }
 
   @Test
@@ -236,11 +244,11 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath, null)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)
-    val responses = zookeeperClient.handle(Seq(existsRequest, createRequest))
-    assertEquals("Response code for exists should be NONODE", Code.NONODE, 
Code.get(responses.head.rc))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(responses.last.rc))
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zookeeperClient.handleRequests(Seq(existsRequest, 
createRequest))
+    assertEquals("Response code for exists should be NONODE", Code.NONODE, 
responses.head.resultCode)
+    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
     assertTrue("Failed to receive create notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
@@ -257,13 +265,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath, null)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)
-    val responses = zookeeperClient.handle(Seq(createRequest, existsRequest))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(responses.last.rc))
-    assertEquals("Response code for exists should be OK", Code.OK, 
Code.get(responses.head.rc))
-    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, 
null)).asInstanceOf[DeleteResponse]
-    assertEquals("Response code for delete should be OK", Code.OK, 
Code.get(deleteResponse.rc))
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zookeeperClient.handleRequests(Seq(createRequest, 
existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
+    assertEquals("Response code for exists should be OK", Code.OK, 
responses.head.resultCode)
+    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, 
-1))
+    assertEquals("Response code for delete should be OK", Code.OK, 
deleteResponse.resultCode)
     assertTrue("Failed to receive delete notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
@@ -280,13 +288,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath, null)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)
-    val responses = zookeeperClient.handle(Seq(createRequest, existsRequest))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(responses.last.rc))
-    assertEquals("Response code for exists should be OK", Code.OK, 
Code.get(responses.head.rc))
-    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, 
Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse]
-    assertEquals("Response code for setData should be OK", Code.OK, 
Code.get(setDataResponse.rc))
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zookeeperClient.handleRequests(Seq(createRequest, 
existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, 
responses.last.resultCode)
+    assertEquals("Response code for exists should be OK", Code.OK, 
responses.head.resultCode)
+    val setDataResponse = 
zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
+    assertEquals("Response code for setData should be OK", Code.OK, 
setDataResponse.resultCode)
     assertTrue("Failed to receive data change notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
@@ -304,13 +312,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
 
     val child1 = "child1"
     val child1Path = mockPath + "/" + child1
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
-    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
     zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
-    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
-    assertEquals("Response code for getChildren should be OK", Code.OK, 
Code.get(getChildrenResponse.rc))
-    val createResponseChild1 = 
zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create child1 should be OK", Code.OK, 
Code.get(createResponseChild1.rc))
+    val getChildrenResponse = 
zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
getChildrenResponse.resultCode)
+    val createResponseChild1 = 
zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child1 should be OK", Code.OK, 
createResponseChild1.resultCode)
     assertTrue("Failed to receive child change notification", 
zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 

Reply via email to