Repository: kafka
Updated Branches:
  refs/heads/trunk 97fc2ca49 -> e5e88f636 (forced update)


KAFKA-5501; introduce async ZookeeperClient

Author: Onur Karaman <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #3427 from onurkaraman/KAFKA-5501


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5e88f63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5e88f63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5e88f63

Branch: refs/heads/trunk
Commit: e5e88f636fa9857105324f62cc758b1fed3602bb
Parents: 47ee8e9
Author: Onur Karaman <[email protected]>
Authored: Wed Jul 26 08:36:05 2017 +0200
Committer: Jun Rao <[email protected]>
Committed: Wed Jul 26 08:03:10 2017 +0100

----------------------------------------------------------------------
 .../kafka/controller/ZookeeperClient.scala      | 286 ++++++++++++++++
 .../kafka/controller/ZookeeperClientTest.scala  | 329 +++++++++++++++++++
 2 files changed, 615 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5e88f63/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala 
b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
new file mode 100644
index 0000000..7ffa511
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, 
CountDownLatch, TimeUnit}
+
+import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
+import kafka.utils.Logging
+import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, 
DataCallback, StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper}
+
+/**
+  * ZookeeperClient is a zookeeper client that encourages pipelined requests 
to zookeeper.
+  *
+  * @param connectString comma separated host:port pairs, each corresponding 
to a zk server
+  * @param sessionTimeoutMs session timeout in milliseconds
+  * @param connectionTimeoutMs connection timeout in milliseconds
+  * @param stateChangeHandler state change handler callbacks called by the 
underlying zookeeper client's EventThread.
+  */
+class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, 
connectionTimeoutMs: Int, stateChangeHandler: StateChangeHandler) extends 
Logging {
+  this.logIdent = "[ZookeeperClient]: "
+  private val initializationLock = new ReentrantReadWriteLock()
+  private val isConnectedOrExpiredLock = new ReentrantLock()
+  private val isConnectedOrExpiredCondition = 
isConnectedOrExpiredLock.newCondition()
+  private val zNodeChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChangeHandler]()
+  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]()
+
+  info(s"Initializing a new session to $connectString.")
+  @volatile private var zooKeeper = new ZooKeeper(connectString, 
sessionTimeoutMs, ZookeeperClientWatcher)
+  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+  /**
+    * Take an AsyncRequest and wait for its AsyncResponse.
+    * @param request a single AsyncRequest to wait on.
+    * @return the request's AsyncReponse.
+    */
+  def handle(request: AsyncRequest): AsyncResponse = {
+    handle(Seq(request)).head
+  }
+
+  /**
+    * Pipeline a sequence of AsyncRequests and wait for all of their 
AsyncResponses.
+    * @param requests a sequence of AsyncRequests to wait on.
+    * @return the AsyncResponses.
+    */
+  def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = 
inReadLock(initializationLock) {
+    import scala.collection.JavaConverters._
+    val countDownLatch = new CountDownLatch(requests.size)
+    val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size)
+    requests.foreach {
+      case CreateRequest(path, data, acl, createMode, ctx) => 
zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, name: 
String) = {
+          responseQueue.add(CreateResponse(rc, path, ctx, name))
+          countDownLatch.countDown()
+        }}, ctx)
+      case DeleteRequest(path, version, ctx) => zooKeeper.delete(path, 
version, new VoidCallback {
+        override def processResult(rc: Int, path: String, ctx: Any) = {
+          responseQueue.add(DeleteResponse(rc, path, ctx))
+          countDownLatch.countDown()
+        }}, ctx)
+      case ExistsRequest(path, ctx) => zooKeeper.exists(path, 
zNodeChangeHandlers.containsKey(path), new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat) = {
+          responseQueue.add(ExistsResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetDataRequest(path, ctx) => zooKeeper.getData(path, 
zNodeChangeHandlers.containsKey(path), new DataCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, data: 
Array[Byte], stat: Stat) = {
+          responseQueue.add(GetDataResponse(rc, path, ctx, data, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case SetDataRequest(path, data, version, ctx) => zooKeeper.setData(path, 
data, version, new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat) = {
+          responseQueue.add(SetDataResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetACLRequest(path, ctx) => zooKeeper.getACL(path, null, new 
ACLCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, acl: 
java.util.List[ACL], stat: Stat): Unit = {
+          responseQueue.add(GetACLResponse(rc, path, ctx, 
Option(acl).map(_.asScala).orNull, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case SetACLRequest(path, acl, version, ctx) => zooKeeper.setACL(path, 
acl.asJava, version, new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: 
Stat) = {
+          responseQueue.add(SetACLResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetChildrenRequest(path, ctx) => zooKeeper.getChildren(path, 
zNodeChildChangeHandlers.containsKey(path), new Children2Callback {
+        override def processResult(rc: Int, path: String, ctx: Any, children: 
java.util.List[String], stat: Stat) = {
+          responseQueue.add(GetChildrenResponse(rc, path, ctx, 
Option(children).map(_.asScala).orNull, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+    }
+    countDownLatch.await()
+    responseQueue.asScala.toSeq
+  }
+
+  /**
+    * Wait indefinitely until the underlying zookeeper client to reaches the 
CONNECTED state.
+    * @throws ZookeeperClientAuthFailedException if the authentication failed 
either before or while waiting for connection.
+    * @throws ZookeeperClientExpiredException if the session expired either 
before or while waiting for connection.
+    */
+  def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
+    waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
+  }
+
+  private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
+    info("Waiting until connected.")
+    var nanos = timeUnit.toNanos(timeout)
+    inLock(isConnectedOrExpiredLock) {
+      var state = zooKeeper.getState
+      while (!state.isConnected && state.isAlive) {
+        if (nanos <= 0) {
+          throw new ZookeeperClientTimeoutException(s"Timed out waiting for 
connection while in state: $state")
+        }
+        nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
+        state = zooKeeper.getState
+      }
+      if (state == States.AUTH_FAILED) {
+        throw new ZookeeperClientAuthFailedException("Auth failed either 
before or while waiting for connection")
+      } else if (state == States.CLOSED) {
+        throw new ZookeeperClientExpiredException("Session expired either 
before or while waiting for connection")
+      }
+    }
+    info("Connected.")
+  }
+
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): 
ExistsResponse = {
+    registerZNodeChangeHandlers(Seq(zNodeChangeHandler)).head
+  }
+
+  def registerZNodeChangeHandlers(handlers: Seq[ZNodeChangeHandler]): 
Seq[ExistsResponse] = {
+    handlers.foreach(handler => zNodeChangeHandlers.put(handler.path, handler))
+    val asyncRequests = handlers.map(handler => ExistsRequest(handler.path, 
null))
+    handle(asyncRequests).asInstanceOf[Seq[ExistsResponse]]
+  }
+
+  def unregisterZNodeChangeHandler(path: String): Unit = {
+    zNodeChangeHandlers.remove(path)
+  }
+
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: 
ZNodeChildChangeHandler): GetChildrenResponse = {
+    registerZNodeChildChangeHandlers(Seq(zNodeChildChangeHandler)).head
+  }
+
+  def registerZNodeChildChangeHandlers(handlers: 
Seq[ZNodeChildChangeHandler]): Seq[GetChildrenResponse] = {
+    handlers.foreach(handler => zNodeChildChangeHandlers.put(handler.path, 
handler))
+    val asyncRequests = handlers.map(handler => 
GetChildrenRequest(handler.path, null))
+    handle(asyncRequests).asInstanceOf[Seq[GetChildrenResponse]]
+  }
+
+  def unregisterZNodeChildChangeHandler(path: String): Unit = {
+    zNodeChildChangeHandlers.remove(path)
+  }
+
+  def close(): Unit = inWriteLock(initializationLock) {
+    info("Closing.")
+    zNodeChangeHandlers.clear()
+    zNodeChildChangeHandlers.clear()
+    zooKeeper.close()
+    info("Closed.")
+  }
+
+  private def initialize(): Unit = {
+    if (!zooKeeper.getState.isAlive) {
+      info(s"Initializing a new session to $connectString.")
+      var now = System.currentTimeMillis()
+      val threshold = now + connectionTimeoutMs
+      while (now < threshold) {
+        try {
+          zooKeeper.close()
+          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZookeeperClientWatcher)
+          waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
+          return
+        } catch {
+          case _: Exception =>
+            now = System.currentTimeMillis()
+            if (now < threshold) {
+              Thread.sleep(1000)
+              now = System.currentTimeMillis()
+            }
+        }
+      }
+      info(s"Timed out waiting for connection during session initialization 
while in state: ${zooKeeper.getState}")
+      stateChangeHandler.onConnectionTimeout
+    }
+  }
+
+  private object ZookeeperClientWatcher extends Watcher {
+    override def process(event: WatchedEvent): Unit = {
+      debug("Received event: " + event)
+      if (event.getPath == null) {
+        inLock(isConnectedOrExpiredLock) {
+          isConnectedOrExpiredCondition.signalAll()
+        }
+        if (event.getState == KeeperState.AuthFailed) {
+          info("Auth failed.")
+          stateChangeHandler.onAuthFailure
+        } else if (event.getState == KeeperState.Expired) {
+          inWriteLock(initializationLock) {
+            info("Session expired.")
+            stateChangeHandler.beforeInitializingSession
+            initialize()
+            stateChangeHandler.afterInitializingSession
+          }
+        }
+      } else if (event.getType == EventType.NodeCreated) {
+        
Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation)
+      } else if (event.getType == EventType.NodeDeleted) {
+        
Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion)
+      } else if (event.getType == EventType.NodeDataChanged) {
+        
Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange)
+      } else if (event.getType == EventType.NodeChildrenChanged) {
+        
Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange)
+      }
+    }
+  }
+}
+
+trait StateChangeHandler {
+  def beforeInitializingSession: Unit
+  def afterInitializingSession: Unit
+  def onAuthFailure: Unit
+  def onConnectionTimeout: Unit
+}
+
+trait ZNodeChangeHandler {
+  val path: String
+  def handleCreation: Unit
+  def handleDeletion: Unit
+  def handleDataChange: Unit
+}
+
+trait ZNodeChildChangeHandler {
+  val path: String
+  def handleChildChange: Unit
+}
+
+sealed trait AsyncRequest {
+  val path: String
+  val ctx: Any
+}
+case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], 
createMode: CreateMode, ctx: Any) extends AsyncRequest
+case class DeleteRequest(path: String, version: Int, ctx: Any) extends 
AsyncRequest
+case class ExistsRequest(path: String, ctx: Any) extends AsyncRequest
+case class GetDataRequest(path: String, ctx: Any) extends AsyncRequest
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: 
Any) extends AsyncRequest
+case class GetACLRequest(path: String, ctx: Any) extends AsyncRequest
+case class SetACLRequest(path: String, acl: Seq[ACL], version: Int, ctx: Any) 
extends AsyncRequest
+case class GetChildrenRequest(path: String, ctx: Any) extends AsyncRequest
+
+sealed trait AsyncResponse {
+  val rc: Int
+  val path: String
+  val ctx: Any
+}
+case class CreateResponse(rc: Int, path: String, ctx: Any, name: String) 
extends AsyncResponse
+case class DeleteResponse(rc: Int, path: String, ctx: Any) extends 
AsyncResponse
+case class ExistsResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends 
AsyncResponse
+case class GetDataResponse(rc: Int, path: String, ctx: Any, data: Array[Byte], 
stat: Stat) extends AsyncResponse
+case class SetDataResponse(rc: Int, path: String, ctx: Any, stat: Stat) 
extends AsyncResponse
+case class GetACLResponse(rc: Int, path: String, ctx: Any, acl: Seq[ACL], 
stat: Stat) extends AsyncResponse
+case class SetACLResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends 
AsyncResponse
+case class GetChildrenResponse(rc: Int, path: String, ctx: Any, children: 
Seq[String], stat: Stat) extends AsyncResponse
+
+class ZookeeperClientException(message: String) extends 
RuntimeException(message)
+class ZookeeperClientExpiredException(message: String) extends 
ZookeeperClientException(message)
+class ZookeeperClientAuthFailedException(message: String) extends 
ZookeeperClientException(message)
+class ZookeeperClientTimeoutException(message: String) extends 
ZookeeperClientException(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5e88f63/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
new file mode 100644
index 0000000..70a2409
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import java.net.UnknownHostException
+import java.nio.charset.StandardCharsets
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
+
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.{After, Test}
+
+class ZookeeperClientTest extends ZooKeeperTestHarness {
+  private val mockPath = "/foo"
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    Configuration.setConfiguration(null)
+  }
+
+  @Test(expected = classOf[UnknownHostException])
+  def testUnresolvableConnectString(): Unit = {
+    new ZookeeperClient("-1", -1, -1, null)
+  }
+
+  @Test(expected = classOf[ZookeeperClientTimeoutException])
+  def testConnectionTimeout(): Unit = {
+    zookeeper.shutdown()
+    new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 
100, null)
+  }
+
+  @Test
+  def testConnection(): Unit = {
+    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+  }
+
+  @Test
+  def testDeleteNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, 
null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(deleteResponse.rc))
+  }
+
+  @Test
+  def testDeleteExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, 
null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code for delete should be OK", Code.OK, 
Code.get(deleteResponse.rc))
+  }
+
+  @Test
+  def testExistsNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, 
null)).asInstanceOf[ExistsResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(existsResponse.rc))
+  }
+
+  @Test
+  def testExistsExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, 
null)).asInstanceOf[ExistsResponse]
+    assertEquals("Response code for exists should be OK", Code.OK, 
Code.get(existsResponse.rc))
+  }
+
+  @Test
+  def testGetDataNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, 
null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(getDataResponse.rc))
+  }
+
+  @Test
+  def testGetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, 
null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(getDataResponse.rc))
+    assertArrayEquals("Data for getData should match created znode data", 
data, getDataResponse.data)
+  }
+
+  @Test
+  def testSetDataNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, 
Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(setDataResponse.rc))
+  }
+
+  @Test
+  def testSetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, 
data, -1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code for setData should be OK", Code.OK, 
Code.get(setDataResponse.rc))
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, 
null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(getDataResponse.rc))
+    assertArrayEquals("Data for getData should match setData's data", data, 
getDataResponse.data)
+  }
+
+  @Test
+  def testGetACLNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, 
null)).asInstanceOf[GetACLResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(getACLResponse.rc))
+  }
+
+  @Test
+  def testGetACLExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, 
null)).asInstanceOf[GetACLResponse]
+    assertEquals("Response code for getACL should be OK", Code.OK, 
Code.get(getACLResponse.rc))
+    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getACLResponse.acl)
+  }
+
+  @Test
+  def testSetACLNonExistentZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val setACLResponse = zookeeperClient.handle(SetACLRequest(mockPath, 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1, null)).asInstanceOf[SetACLResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(setACLResponse.rc))
+  }
+
+  @Test
+  def testGetChildrenNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, 
Code.get(getChildrenResponse.rc))
+  }
+
+  @Test
+  def testGetChildrenExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
Code.get(getChildrenResponse.rc))
+    assertEquals("getChildren should return no children", Seq.empty[String], 
getChildrenResponse.children)
+  }
+
+  @Test
+  def testGetChildrenExistingZNodeWithChildren(): Unit = {
+    import scala.collection.JavaConverters._
+    val child1 = "child1"
+    val child2 = "child2"
+    val child1Path = mockPath + "/" + child1
+    val child2Path = mockPath + "/" + child2
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val createResponseChild1 = 
zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child1 should be OK", Code.OK, 
Code.get(createResponseChild1.rc))
+    val createResponseChild2 = 
zookeeperClient.handle(CreateRequest(child2Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child2 should be OK", Code.OK, 
Code.get(createResponseChild2.rc))
+
+    val getChildrenResponse = 
zookeeperClient.handle(GetChildrenRequest(mockPath, 
null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, 
Code.get(getChildrenResponse.rc))
+    assertEquals("getChildren should return two children", Seq(child1, 
child2), getChildrenResponse.children.sorted)
+  }
+
+  @Test
+  def testPipelinedGetData(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 
2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, 
CreateMode.PERSISTENT, null))
+    val createResponses = createRequests.map(zookeeperClient.handle)
+    createResponses.foreach(createResponse => assertEquals("Response code for 
create should be OK", Code.OK, Code.get(createResponse.rc)))
+    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x, null))
+    val getDataResponses = zookeeperClient.handle(getDataRequests)
+    getDataResponses.foreach(getDataResponse => assertEquals("Response code 
for getData should be OK", Code.OK, Code.get(getDataResponse.rc)))
+    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
+      assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(getDataResponse.rc))
+      assertEquals("Data for getData should match", ((i + 1) * 2), 
Integer.valueOf(new String(getDataResponse.asInstanceOf[GetDataResponse].data)))
+    }
+  }
+
+  @Test
+  def testMixedPipeline(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    val getDataRequest = GetDataRequest(mockPath, null)
+    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1, 
null)
+    val responses = zookeeperClient.handle(Seq(getDataRequest, setDataRequest))
+    assertEquals("Response code for getData should be OK", Code.OK, 
Code.get(responses.head.rc))
+    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], 
responses.head.asInstanceOf[GetDataResponse].data)
+    assertEquals("Response code for setData should be NONODE", Code.NONODE, 
Code.get(responses.last.rc))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForCreation(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override def handleDeletion = {}
+      override def handleDataChange = {}
+      override val path: String = mockPath
+    }
+
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    assertTrue("Failed to receive create notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDeletion(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {}
+      override def handleDeletion = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override def handleDataChange = {}
+      override val path: String = mockPath
+    }
+
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, 
null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code for delete should be OK", Code.OK, 
Code.get(deleteResponse.rc))
+    assertTrue("Failed to receive delete notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {}
+      override def handleDeletion = {}
+      override def handleDataChange = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, 
Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code for setData should be OK", Code.OK, 
Code.get(setDataResponse.rc))
+    assertTrue("Failed to receive data change notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChildChangeHandlerForChildChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, null)
+    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
+      override def handleChildChange = {
+        zNodeChildChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val child1 = "child1"
+    val child1Path = mockPath + "/" + child1
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, 
Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, 
null))
+    assertEquals("Response code for create should be OK", Code.OK, 
Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val createResponseChild1 = 
zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], 
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child1 should be OK", Code.OK, 
Code.get(createResponseChild1.rc))
+    assertTrue("Failed to receive child change notification", 
zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testStateChangeHandlerForAuthFailure(): Unit = {
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
"no-such-file-exists.conf")
+    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override def beforeInitializingSession = {}
+      override def afterInitializingSession = {}
+      override def onAuthFailure = {
+        stateChangeHandlerCountDownLatch.countDown()
+      }
+      override def onConnectionTimeout = {}
+    }
+    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
stateChangeHandler)
+    assertTrue("Failed to receive auth failed notification", 
stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  private def bytes = 
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
+}

Reply via email to