Repository: kafka Updated Branches: refs/heads/trunk efefb452d -> b1cd6c530
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 16e1486..1c87b5e 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -18,6 +18,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.{StateChangeFailedException, TopicAndPartition} +import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.server.KafkaConfig import kafka.utils.Logging import org.apache.zookeeper.KeeperException @@ -145,7 +146,7 @@ class PartitionStateMachine(config: KafkaConfig, * @param targetState The end state that the partition should be moved to */ private def doHandleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState, - partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = { + partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = { val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition)) val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState)) @@ -223,8 +224,8 @@ class PartitionStateMachine(config: KafkaConfig, Seq.empty } createResponses.foreach { createResponse => - val code = Code.get(createResponse.rc) - val partition = createResponse.ctx.asInstanceOf[TopicAndPartition] + val code = createResponse.resultCode + val partition = createResponse.ctx.get.asInstanceOf[TopicAndPartition] val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition) if (code == Code.OK) { controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) @@ -275,9 +276,7 @@ class PartitionStateMachine(config: KafkaConfig, * 3. Exceptions corresponding to failed elections that should not be retried. */ private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): - (Seq[TopicAndPartition], - Seq[TopicAndPartition], - Map[TopicAndPartition, Exception]) = { + (Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = { val getDataResponses = try { zkUtils.getTopicPartitionStatesRaw(partitions) } catch { @@ -287,20 +286,20 @@ class PartitionStateMachine(config: KafkaConfig, val failedElections = mutable.Map.empty[TopicAndPartition, Exception] val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicAndPartition, LeaderIsrAndControllerEpoch)] getDataResponses.foreach { getDataResponse => - val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition] + val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition] val currState = partitionState(partition) - if (Code.get(getDataResponse.rc) == Code.OK) { + if (getDataResponse.resultCode == Code.OK) { val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) if (leaderIsrAndControllerEpochOpt.isEmpty) { val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state") failedElections.put(partition, exception) } leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get - } else if (Code.get(getDataResponse.rc) == Code.NONODE) { + } else if (getDataResponse.resultCode == Code.NONODE) { val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state") failedElections.put(partition, exception) } else { - failedElections.put(partition, KeeperException.create(Code.get(getDataResponse.rc))) + failedElections.put(partition, getDataResponse.resultException.get) } } val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) => @@ -332,7 +331,8 @@ class PartitionStateMachine(config: KafkaConfig, } val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap - val (successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch) + val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr( + adjustedLeaderAndIsrs, controllerContext.epoch) successfulUpdates.foreach { case (partition, leaderAndIsr) => val replicas = controllerContext.partitionReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index f811612..4da1c7b 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -19,9 +19,9 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.{StateChangeFailedException, TopicAndPartition} import kafka.controller.Callbacks.CallbackBuilder +import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.server.KafkaConfig import kafka.utils.Logging -import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code import scala.collection.mutable @@ -292,7 +292,8 @@ class ReplicaStateMachine(config: KafkaConfig, val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId) leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr) } - val (successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch) + val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr( + adjustedLeaderAndIsrs, controllerContext.epoch) val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition => if (!topicDeletionManager.isPartitionToBeDeleted(partition)) { val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty") @@ -331,8 +332,8 @@ class ReplicaStateMachine(config: KafkaConfig, return (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap) } getDataResponses.foreach { getDataResponse => - val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition] - if (Code.get(getDataResponse.rc) == Code.OK) { + val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition] + if (getDataResponse.resultCode == Code.OK) { val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) if (leaderIsrAndControllerEpochOpt.isEmpty) { partitionsWithNoLeaderAndIsrInZk += partition @@ -347,10 +348,10 @@ class ReplicaStateMachine(config: KafkaConfig, leaderAndIsrs.put(partition, leaderIsrAndControllerEpoch.leaderAndIsr) } } - } else if (Code.get(getDataResponse.rc) == Code.NONODE) { + } else if (getDataResponse.resultCode == Code.NONODE) { partitionsWithNoLeaderAndIsrInZk += partition } else { - failed.put(partition, KeeperException.create(Code.get(getDataResponse.rc))) + failed.put(partition, getDataResponse.resultException.get) } } (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap) http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/controller/ZookeeperClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala index e68c738..0009439 100644 --- a/core/src/main/scala/kafka/controller/ZookeeperClient.scala +++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala @@ -23,10 +23,13 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLat import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} import kafka.utils.Logging import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} +import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} import org.apache.zookeeper.ZooKeeper.States import org.apache.zookeeper.data.{ACL, Stat} -import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper} +import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper} + +import scala.collection.JavaConverters._ /** * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. @@ -36,88 +39,106 @@ import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper} * @param connectionTimeoutMs connection timeout in milliseconds * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread. */ -class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, stateChangeHandler: StateChangeHandler) extends Logging { - this.logIdent = "[ZookeeperClient]: " +class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, + stateChangeHandler: StateChangeHandler) extends Logging { + this.logIdent = "[ZookeeperClient] " private val initializationLock = new ReentrantReadWriteLock() private val isConnectedOrExpiredLock = new ReentrantLock() private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition() - private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]() - private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]() + private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala + private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala info(s"Initializing a new session to $connectString.") @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher) waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) /** - * Take an AsyncRequest and wait for its AsyncResponse. See handle(Seq[AsyncRequest]) for details. + * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details. * - * @param request a single AsyncRequest to wait on. - * @return the request's AsyncResponse. + * @param request a single request to send and wait on. + * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse). */ - def handle(request: AsyncRequest): AsyncResponse = { - handle(Seq(request)).head + def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = { + handleRequests(Seq(request)).head } /** - * Pipeline a sequence of AsyncRequests and wait for all of their AsyncResponses. + * Send a pipelined sequence of requests and wait for all of their responses. * * The watch flag on each outgoing request will be set if we've already registered a handler for the - * path associated with the AsyncRequest. + * path associated with the request. * - * @param requests a sequence of AsyncRequests to wait on. - * @return the AsyncResponses. + * @param requests a sequence of requests to send and wait on. + * @return the responses for the requests. If all requests have the same type, the responses will have the respective + * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype + * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]). */ - def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = inReadLock(initializationLock) { - import scala.collection.JavaConverters._ - if (requests.isEmpty) { - return Seq.empty - } - val countDownLatch = new CountDownLatch(requests.size) - val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size) - requests.foreach { - case CreateRequest(path, data, acl, createMode, ctx) => zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { - override def processResult(rc: Int, path: String, ctx: Any, name: String) = { - responseQueue.add(CreateResponse(rc, path, ctx, name)) - countDownLatch.countDown() - }}, ctx) - case DeleteRequest(path, version, ctx) => zooKeeper.delete(path, version, new VoidCallback { - override def processResult(rc: Int, path: String, ctx: Any) = { - responseQueue.add(DeleteResponse(rc, path, ctx)) - countDownLatch.countDown() - }}, ctx) - case ExistsRequest(path, ctx) => zooKeeper.exists(path, zNodeChangeHandlers.containsKey(path), new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = { - responseQueue.add(ExistsResponse(rc, path, ctx, stat)) - countDownLatch.countDown() - }}, ctx) - case GetDataRequest(path, ctx) => zooKeeper.getData(path, zNodeChangeHandlers.containsKey(path), new DataCallback { - override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat) = { - responseQueue.add(GetDataResponse(rc, path, ctx, data, stat)) - countDownLatch.countDown() - }}, ctx) - case SetDataRequest(path, data, version, ctx) => zooKeeper.setData(path, data, version, new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = { - responseQueue.add(SetDataResponse(rc, path, ctx, stat)) - countDownLatch.countDown() - }}, ctx) - case GetACLRequest(path, ctx) => zooKeeper.getACL(path, null, new ACLCallback { - override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = { - responseQueue.add(GetACLResponse(rc, path, ctx, Option(acl).map(_.asScala).orNull, stat)) - countDownLatch.countDown() - }}, ctx) - case SetACLRequest(path, acl, version, ctx) => zooKeeper.setACL(path, acl.asJava, version, new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = { - responseQueue.add(SetACLResponse(rc, path, ctx, stat)) - countDownLatch.countDown() - }}, ctx) - case GetChildrenRequest(path, ctx) => zooKeeper.getChildren(path, zNodeChildChangeHandlers.containsKey(path), new Children2Callback { - override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat) = { - responseQueue.add(GetChildrenResponse(rc, path, ctx, Option(children).map(_.asScala).orNull, stat)) + def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) { + if (requests.isEmpty) + Seq.empty + else { + val countDownLatch = new CountDownLatch(requests.size) + val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) + + requests.foreach { request => + send(request) { response => + responseQueue.add(response) countDownLatch.countDown() - }}, ctx) + } + } + countDownLatch.await() + responseQueue.asScala.toBuffer + } + } + + private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { + // Safe to cast as we always create a response of the right type + def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response]) + + request match { + case ExistsRequest(path, ctx) => + zooKeeper.exists(path, shouldWatch(request), new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat)) + }, ctx.orNull) + case GetDataRequest(path, ctx) => + zooKeeper.getData(path, shouldWatch(request), new DataCallback { + override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit = + callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat)) + }, ctx.orNull) + case GetChildrenRequest(path, ctx) => + zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback { + override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit = + callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), + Option(children).map(_.asScala).getOrElse(Seq.empty), stat)) + }, ctx.orNull) + case CreateRequest(path, data, acl, createMode, ctx) => + zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { + override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit = + callback(CreateResponse(Code.get(rc), path, Option(ctx), name)) + }, ctx.orNull) + case SetDataRequest(path, data, version, ctx) => + zooKeeper.setData(path, data, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat)) + }, ctx.orNull) + case DeleteRequest(path, version, ctx) => + zooKeeper.delete(path, version, new VoidCallback { + override def processResult(rc: Int, path: String, ctx: Any): Unit = + callback(DeleteResponse(Code.get(rc), path, Option(ctx))) + }, ctx.orNull) + case GetAclRequest(path, ctx) => + zooKeeper.getACL(path, null, new ACLCallback { + override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = { + callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty), + stat)) + }}, ctx.orNull) + case SetAclRequest(path, acl, version, ctx) => + zooKeeper.setACL(path, acl.asJava, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat)) + }, ctx.orNull) } - countDownLatch.await() - responseQueue.asScala.toSeq } /** @@ -150,6 +171,14 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi info("Connected.") } + // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler + // may need to be updated. + private def shouldWatch(request: AsyncRequest): Boolean = request match { + case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path) + case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path) + case _ => throw new IllegalArgumentException(s"Request $request is not watchable") + } + /** * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher. * @@ -199,7 +228,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi info("Closed.") } - def sessionId = inReadLock(initializationLock) { + def sessionId: Long = inReadLock(initializationLock) { zooKeeper.getSessionId } @@ -231,29 +260,29 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi private object ZookeeperClientWatcher extends Watcher { override def process(event: WatchedEvent): Unit = { debug("Received event: " + event) - if (event.getPath == null) { - inLock(isConnectedOrExpiredLock) { - isConnectedOrExpiredCondition.signalAll() - } - if (event.getState == KeeperState.AuthFailed) { - info("Auth failed.") - stateChangeHandler.onAuthFailure() - } else if (event.getState == KeeperState.Expired) { - inWriteLock(initializationLock) { - info("Session expired.") - stateChangeHandler.beforeInitializingSession() - initialize() - stateChangeHandler.afterInitializingSession() + Option(event.getPath) match { + case None => + inLock(isConnectedOrExpiredLock) { + isConnectedOrExpiredCondition.signalAll() + } + if (event.getState == KeeperState.AuthFailed) { + info("Auth failed.") + stateChangeHandler.onAuthFailure() + } else if (event.getState == KeeperState.Expired) { + inWriteLock(initializationLock) { + info("Session expired.") + stateChangeHandler.beforeInitializingSession() + initialize() + stateChangeHandler.afterInitializingSession() + } + } + case Some(path) => + (event.getType: @unchecked) match { + case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange()) + case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation()) + case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion()) + case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange()) } - } - } else if (event.getType == EventType.NodeCreated) { - Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation()) - } else if (event.getType == EventType.NodeDeleted) { - Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion()) - } else if (event.getType == EventType.NodeDataChanged) { - Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange()) - } else if (event.getType == EventType.NodeChildrenChanged) { - Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange()) } } } @@ -279,33 +308,67 @@ trait ZNodeChildChangeHandler { } sealed trait AsyncRequest { - val path: String - val ctx: Any + /** + * This type member allows us to define methods that take requests and return responses with the correct types. + * See ``ZookeeperClient.handleRequests`` for example. + */ + type Response <: AsyncResponse + def path: String + def ctx: Option[Any] +} + +case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, + ctx: Option[Any] = None) extends AsyncRequest { + type Response = CreateResponse +} + +case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest { + type Response = DeleteResponse +} + +case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = ExistsResponse +} + +case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetDataResponse +} + +case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest { + type Response = SetDataResponse +} + +case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetAclResponse +} + +case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest { + type Response = SetAclResponse +} + +case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetChildrenResponse } -case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, ctx: Any) extends AsyncRequest -case class DeleteRequest(path: String, version: Int, ctx: Any) extends AsyncRequest -case class ExistsRequest(path: String, ctx: Any) extends AsyncRequest -case class GetDataRequest(path: String, ctx: Any) extends AsyncRequest -case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Any) extends AsyncRequest -case class GetACLRequest(path: String, ctx: Any) extends AsyncRequest -case class SetACLRequest(path: String, acl: Seq[ACL], version: Int, ctx: Any) extends AsyncRequest -case class GetChildrenRequest(path: String, ctx: Any) extends AsyncRequest sealed trait AsyncResponse { - val rc: Int - val path: String - val ctx: Any + def resultCode: Code + def path: String + def ctx: Option[Any] + + /** Return None if the result code is OK and KeeperException otherwise. */ + def resultException: Option[KeeperException] = + if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path)) } -case class CreateResponse(rc: Int, path: String, ctx: Any, name: String) extends AsyncResponse -case class DeleteResponse(rc: Int, path: String, ctx: Any) extends AsyncResponse -case class ExistsResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse -case class GetDataResponse(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat) extends AsyncResponse -case class SetDataResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse -case class GetACLResponse(rc: Int, path: String, ctx: Any, acl: Seq[ACL], stat: Stat) extends AsyncResponse -case class SetACLResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse -case class GetChildrenResponse(rc: Int, path: String, ctx: Any, children: Seq[String], stat: Stat) extends AsyncResponse +case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse +case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse +case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse +case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse +case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse +case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse +case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse +case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse class ZookeeperClientException(message: String) extends RuntimeException(message) class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message) class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message) -class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message) \ No newline at end of file +class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message) http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 38e0b66..cc38667 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -24,7 +24,7 @@ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.cluster._ import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition} import kafka.consumer.{ConsumerThreadId, TopicCount} -import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext} +import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext} import kafka.metrics.KafkaMetricsGroup import kafka.server.ConfigType import kafka.utils.ZkUtils._ http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index 1214344..296f4a7 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -18,6 +18,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition +import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.log.LogConfig import kafka.server.KafkaConfig import kafka.utils.TestUtils @@ -82,7 +83,7 @@ class PartitionStateMachineTest extends JUnitSuite { val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) - .andReturn(Seq(CreateResponse(Code.OK.intValue(), null, partition, null))) + .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null))) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) @@ -116,7 +117,7 @@ class PartitionStateMachineTest extends JUnitSuite { val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) - .andReturn(Seq(CreateResponse(Code.NODEEXISTS.intValue(), null, partition, null))) + .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null))) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) @@ -150,13 +151,13 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, + .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) - .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) + .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false)) @@ -182,13 +183,13 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, + .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId)) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) - .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) + .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId), isNew = false)) @@ -233,14 +234,15 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) + .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), + TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals())) .andReturn((Map(partition.topic -> LogConfig()), Map.empty)) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) - .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) + .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) @@ -284,7 +286,8 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.NONODE.intValue(), null, partition, TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) + .andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition), + TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala index 62c28a0..0afe7c2 100644 --- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala @@ -18,6 +18,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition +import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.zookeeper.KeeperException.Code @@ -178,10 +179,10 @@ class ReplicaStateMachineTest extends JUnitSuite { val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1) val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch) EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) - .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, + .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch)) - .andReturn(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty) + .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false)) http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd6c53/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala index 9f172f0..d7b46c7 100644 --- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala @@ -58,42 +58,42 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { @Test def testDeleteNonExistentZNode(): Unit = { val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(deleteResponse.rc)) + val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1)) + assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode) } @Test def testDeleteExistingZNode(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse] - assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1)) + assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode) } @Test def testExistsNonExistentZNode(): Unit = { val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(existsResponse.rc)) + val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode) } @Test def testExistsExistingZNode(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse] - assertEquals("Response code for exists should be OK", Code.OK, Code.get(existsResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath)) + assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode) } @Test def testGetDataNonExistentZNode(): Unit = { val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getDataResponse.rc)) + val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode) } @Test @@ -101,18 +101,19 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { import scala.collection.JavaConverters._ val data = bytes val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse] - assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, + CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath)) + assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data) } @Test def testSetDataNonExistentZNode(): Unit = { val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setDataResponse.rc)) + val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) + assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode) } @Test @@ -120,56 +121,58 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { import scala.collection.JavaConverters._ val data = bytes val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, data, -1, null)).asInstanceOf[SetDataResponse] - assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc)) - val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse] - assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, data, -1)) + assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode) + val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath)) + assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data) } @Test - def testGetACLNonExistentZNode(): Unit = { + def testGetAclNonExistentZNode(): Unit = { val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getACLResponse.rc)) + val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode) } @Test - def testGetACLExistingZNode(): Unit = { + def testGetAclExistingZNode(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse] - assertEquals("Response code for getACL should be OK", Code.OK, Code.get(getACLResponse.rc)) - assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getACLResponse.acl) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath)) + assertEquals("Response code for getAcl should be OK", Code.OK, getAclResponse.resultCode) + assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl) } @Test - def testSetACLNonExistentZNode(): Unit = { + def testSetAclNonExistentZNode(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val setACLResponse = zookeeperClient.handle(SetACLRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1, null)).asInstanceOf[SetACLResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setACLResponse.rc)) + val setAclResponse = zookeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1)) + assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode) } @Test def testGetChildrenNonExistentZNode(): Unit = { val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] - assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getChildrenResponse.rc)) + val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode) } @Test def testGetChildrenExistingZNode(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] - assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children) } @@ -181,15 +184,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { val child1Path = mockPath + "/" + child1 val child2Path = mockPath + "/" + child2 val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc)) - val createResponseChild2 = zookeeperClient.handle(CreateRequest(child2Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create child2 should be OK", Code.OK, Code.get(createResponseChild2.rc)) - - val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] - assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode) + val createResponseChild2 = zookeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create child2 should be OK", Code.OK, createResponseChild2.resultCode) + + val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted) } @@ -197,15 +203,16 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { def testPipelinedGetData(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - val createResponses = createRequests.map(zookeeperClient.handle) - createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))) - val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x, null)) - val getDataResponses = zookeeperClient.handle(getDataRequests) - getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))) + val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + val createResponses = createRequests.map(zookeeperClient.handleRequest) + createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)) + val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x)) + val getDataResponses = zookeeperClient.handleRequests(getDataRequests) + getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK, + getDataResponse.resultCode)) getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) => - assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc)) - assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.asInstanceOf[GetDataResponse].data))) + assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) + assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.data))) } } @@ -213,14 +220,15 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { def testMixedPipeline(): Unit = { import scala.collection.JavaConverters._ val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) - val getDataRequest = GetDataRequest(mockPath, null) - val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1, null) - val responses = zookeeperClient.handle(Seq(getDataRequest, setDataRequest)) - assertEquals("Response code for getData should be OK", Code.OK, Code.get(responses.head.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getDataRequest = GetDataRequest(mockPath) + val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1) + val responses = zookeeperClient.handleRequests(Seq(getDataRequest, setDataRequest)) + assertEquals("Response code for getData should be OK", Code.OK, responses.head.resultCode) assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data) - assertEquals("Response code for setData should be NONODE", Code.NONODE, Code.get(responses.last.rc)) + assertEquals("Response code for setData should be NONODE", Code.NONODE, responses.last.resultCode) } @Test @@ -236,11 +244,11 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { } zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsRequest = ExistsRequest(mockPath, null) - val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null) - val responses = zookeeperClient.handle(Seq(existsRequest, createRequest)) - assertEquals("Response code for exists should be NONODE", Code.NONODE, Code.get(responses.head.rc)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc)) + val existsRequest = ExistsRequest(mockPath) + val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) + val responses = zookeeperClient.handleRequests(Seq(existsRequest, createRequest)) + assertEquals("Response code for exists should be NONODE", Code.NONODE, responses.head.resultCode) + assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) } @@ -257,13 +265,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { } zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsRequest = ExistsRequest(mockPath, null) - val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null) - val responses = zookeeperClient.handle(Seq(createRequest, existsRequest)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc)) - assertEquals("Response code for exists should be OK", Code.OK, Code.get(responses.head.rc)) - val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse] - assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc)) + val existsRequest = ExistsRequest(mockPath) + val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) + val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest)) + assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) + assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode) + val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1)) + assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode) assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) } @@ -280,13 +288,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { } zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsRequest = ExistsRequest(mockPath, null) - val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null) - val responses = zookeeperClient.handle(Seq(createRequest, existsRequest)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc)) - assertEquals("Response code for exists should be OK", Code.OK, Code.get(responses.head.rc)) - val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse] - assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc)) + val existsRequest = ExistsRequest(mockPath) + val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) + val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest)) + assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) + assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode) + val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) + assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode) assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) } @@ -304,13 +312,13 @@ class ZookeeperClientTest extends ZooKeeperTestHarness { val child1 = "child1" val child1Path = mockPath + "/" + child1 - val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler) - val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] - assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc)) - val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) - assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc)) + val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) + val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode) assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) }