Repository: kafka Updated Branches: refs/heads/trunk 97fc2ca49 -> e5e88f636 (forced update)
KAFKA-5501; introduce async ZookeeperClient Author: Onur Karaman <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #3427 from onurkaraman/KAFKA-5501 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5e88f63 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5e88f63 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5e88f63 Branch: refs/heads/trunk Commit: e5e88f636fa9857105324f62cc758b1fed3602bb Parents: 47ee8e9 Author: Onur Karaman <[email protected]> Authored: Wed Jul 26 08:36:05 2017 +0200 Committer: Jun Rao <[email protected]> Committed: Wed Jul 26 08:03:10 2017 +0100 ---------------------------------------------------------------------- .../kafka/controller/ZookeeperClient.scala | 286 ++++++++++++++++ .../kafka/controller/ZookeeperClientTest.scala | 329 +++++++++++++++++++ 2 files changed, 615 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e5e88f63/core/src/main/scala/kafka/controller/ZookeeperClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala new file mode 100644 index 0000000..7ffa511 --- /dev/null +++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit} + +import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} +import kafka.utils.Logging +import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} +import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} +import org.apache.zookeeper.ZooKeeper.States +import org.apache.zookeeper.data.{ACL, Stat} +import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper} + +/** + * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. + * + * @param connectString comma separated host:port pairs, each corresponding to a zk server + * @param sessionTimeoutMs session timeout in milliseconds + * @param connectionTimeoutMs connection timeout in milliseconds + * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread. + */ +class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, stateChangeHandler: StateChangeHandler) extends Logging { + this.logIdent = "[ZookeeperClient]: " + private val initializationLock = new ReentrantReadWriteLock() + private val isConnectedOrExpiredLock = new ReentrantLock() + private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition() + private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]() + private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]() + + info(s"Initializing a new session to $connectString.") + @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher) + waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) + + /** + * Take an AsyncRequest and wait for its AsyncResponse. + * @param request a single AsyncRequest to wait on. + * @return the request's AsyncReponse. + */ + def handle(request: AsyncRequest): AsyncResponse = { + handle(Seq(request)).head + } + + /** + * Pipeline a sequence of AsyncRequests and wait for all of their AsyncResponses. + * @param requests a sequence of AsyncRequests to wait on. + * @return the AsyncResponses. + */ + def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = inReadLock(initializationLock) { + import scala.collection.JavaConverters._ + val countDownLatch = new CountDownLatch(requests.size) + val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size) + requests.foreach { + case CreateRequest(path, data, acl, createMode, ctx) => zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { + override def processResult(rc: Int, path: String, ctx: Any, name: String) = { + responseQueue.add(CreateResponse(rc, path, ctx, name)) + countDownLatch.countDown() + }}, ctx) + case DeleteRequest(path, version, ctx) => zooKeeper.delete(path, version, new VoidCallback { + override def processResult(rc: Int, path: String, ctx: Any) = { + responseQueue.add(DeleteResponse(rc, path, ctx)) + countDownLatch.countDown() + }}, ctx) + case ExistsRequest(path, ctx) => zooKeeper.exists(path, zNodeChangeHandlers.containsKey(path), new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = { + responseQueue.add(ExistsResponse(rc, path, ctx, stat)) + countDownLatch.countDown() + }}, ctx) + case GetDataRequest(path, ctx) => zooKeeper.getData(path, zNodeChangeHandlers.containsKey(path), new DataCallback { + override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat) = { + responseQueue.add(GetDataResponse(rc, path, ctx, data, stat)) + countDownLatch.countDown() + }}, ctx) + case SetDataRequest(path, data, version, ctx) => zooKeeper.setData(path, data, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = { + responseQueue.add(SetDataResponse(rc, path, ctx, stat)) + countDownLatch.countDown() + }}, ctx) + case GetACLRequest(path, ctx) => zooKeeper.getACL(path, null, new ACLCallback { + override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = { + responseQueue.add(GetACLResponse(rc, path, ctx, Option(acl).map(_.asScala).orNull, stat)) + countDownLatch.countDown() + }}, ctx) + case SetACLRequest(path, acl, version, ctx) => zooKeeper.setACL(path, acl.asJava, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = { + responseQueue.add(SetACLResponse(rc, path, ctx, stat)) + countDownLatch.countDown() + }}, ctx) + case GetChildrenRequest(path, ctx) => zooKeeper.getChildren(path, zNodeChildChangeHandlers.containsKey(path), new Children2Callback { + override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat) = { + responseQueue.add(GetChildrenResponse(rc, path, ctx, Option(children).map(_.asScala).orNull, stat)) + countDownLatch.countDown() + }}, ctx) + } + countDownLatch.await() + responseQueue.asScala.toSeq + } + + /** + * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state. + * @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection. + * @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection. + */ + def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) { + waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS) + } + + private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = { + info("Waiting until connected.") + var nanos = timeUnit.toNanos(timeout) + inLock(isConnectedOrExpiredLock) { + var state = zooKeeper.getState + while (!state.isConnected && state.isAlive) { + if (nanos <= 0) { + throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state") + } + nanos = isConnectedOrExpiredCondition.awaitNanos(nanos) + state = zooKeeper.getState + } + if (state == States.AUTH_FAILED) { + throw new ZookeeperClientAuthFailedException("Auth failed either before or while waiting for connection") + } else if (state == States.CLOSED) { + throw new ZookeeperClientExpiredException("Session expired either before or while waiting for connection") + } + } + info("Connected.") + } + + def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): ExistsResponse = { + registerZNodeChangeHandlers(Seq(zNodeChangeHandler)).head + } + + def registerZNodeChangeHandlers(handlers: Seq[ZNodeChangeHandler]): Seq[ExistsResponse] = { + handlers.foreach(handler => zNodeChangeHandlers.put(handler.path, handler)) + val asyncRequests = handlers.map(handler => ExistsRequest(handler.path, null)) + handle(asyncRequests).asInstanceOf[Seq[ExistsResponse]] + } + + def unregisterZNodeChangeHandler(path: String): Unit = { + zNodeChangeHandlers.remove(path) + } + + def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): GetChildrenResponse = { + registerZNodeChildChangeHandlers(Seq(zNodeChildChangeHandler)).head + } + + def registerZNodeChildChangeHandlers(handlers: Seq[ZNodeChildChangeHandler]): Seq[GetChildrenResponse] = { + handlers.foreach(handler => zNodeChildChangeHandlers.put(handler.path, handler)) + val asyncRequests = handlers.map(handler => GetChildrenRequest(handler.path, null)) + handle(asyncRequests).asInstanceOf[Seq[GetChildrenResponse]] + } + + def unregisterZNodeChildChangeHandler(path: String): Unit = { + zNodeChildChangeHandlers.remove(path) + } + + def close(): Unit = inWriteLock(initializationLock) { + info("Closing.") + zNodeChangeHandlers.clear() + zNodeChildChangeHandlers.clear() + zooKeeper.close() + info("Closed.") + } + + private def initialize(): Unit = { + if (!zooKeeper.getState.isAlive) { + info(s"Initializing a new session to $connectString.") + var now = System.currentTimeMillis() + val threshold = now + connectionTimeoutMs + while (now < threshold) { + try { + zooKeeper.close() + zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher) + waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS) + return + } catch { + case _: Exception => + now = System.currentTimeMillis() + if (now < threshold) { + Thread.sleep(1000) + now = System.currentTimeMillis() + } + } + } + info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}") + stateChangeHandler.onConnectionTimeout + } + } + + private object ZookeeperClientWatcher extends Watcher { + override def process(event: WatchedEvent): Unit = { + debug("Received event: " + event) + if (event.getPath == null) { + inLock(isConnectedOrExpiredLock) { + isConnectedOrExpiredCondition.signalAll() + } + if (event.getState == KeeperState.AuthFailed) { + info("Auth failed.") + stateChangeHandler.onAuthFailure + } else if (event.getState == KeeperState.Expired) { + inWriteLock(initializationLock) { + info("Session expired.") + stateChangeHandler.beforeInitializingSession + initialize() + stateChangeHandler.afterInitializingSession + } + } + } else if (event.getType == EventType.NodeCreated) { + Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation) + } else if (event.getType == EventType.NodeDeleted) { + Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion) + } else if (event.getType == EventType.NodeDataChanged) { + Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange) + } else if (event.getType == EventType.NodeChildrenChanged) { + Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange) + } + } + } +} + +trait StateChangeHandler { + def beforeInitializingSession: Unit + def afterInitializingSession: Unit + def onAuthFailure: Unit + def onConnectionTimeout: Unit +} + +trait ZNodeChangeHandler { + val path: String + def handleCreation: Unit + def handleDeletion: Unit + def handleDataChange: Unit +} + +trait ZNodeChildChangeHandler { + val path: String + def handleChildChange: Unit +} + +sealed trait AsyncRequest { + val path: String + val ctx: Any +} +case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, ctx: Any) extends AsyncRequest +case class DeleteRequest(path: String, version: Int, ctx: Any) extends AsyncRequest +case class ExistsRequest(path: String, ctx: Any) extends AsyncRequest +case class GetDataRequest(path: String, ctx: Any) extends AsyncRequest +case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Any) extends AsyncRequest +case class GetACLRequest(path: String, ctx: Any) extends AsyncRequest +case class SetACLRequest(path: String, acl: Seq[ACL], version: Int, ctx: Any) extends AsyncRequest +case class GetChildrenRequest(path: String, ctx: Any) extends AsyncRequest + +sealed trait AsyncResponse { + val rc: Int + val path: String + val ctx: Any +} +case class CreateResponse(rc: Int, path: String, ctx: Any, name: String) extends AsyncResponse +case class DeleteResponse(rc: Int, path: String, ctx: Any) extends AsyncResponse +case class ExistsResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse +case class GetDataResponse(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat) extends AsyncResponse +case class SetDataResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse +case class GetACLResponse(rc: Int, path: String, ctx: Any, acl: Seq[ACL], stat: Stat) extends AsyncResponse +case class SetACLResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse +case class GetChildrenResponse(rc: Int, path: String, ctx: Any, children: Seq[String], stat: Stat) extends AsyncResponse + +class ZookeeperClientException(message: String) extends RuntimeException(message) +class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message) +class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message) +class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e5e88f63/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala new file mode 100644 index 0000000..70a2409 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.controller + +import java.net.UnknownHostException +import java.nio.charset.StandardCharsets +import java.util.UUID +import java.util.concurrent.{CountDownLatch, TimeUnit} +import javax.security.auth.login.Configuration + +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.{CreateMode, ZooDefs} +import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue} +import org.junit.{After, Test} + +class ZookeeperClientTest extends ZooKeeperTestHarness { + private val mockPath = "/foo" + + @After + override def tearDown() { + super.tearDown() + System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + Configuration.setConfiguration(null) + } + + @Test(expected = classOf[UnknownHostException]) + def testUnresolvableConnectString(): Unit = { + new ZookeeperClient("-1", -1, -1, null) + } + + @Test(expected = classOf[ZookeeperClientTimeoutException]) + def testConnectionTimeout(): Unit = { + zookeeper.shutdown() + new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null) + } + + @Test + def testConnection(): Unit = { + new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + } + + @Test + def testDeleteNonExistentZNode(): Unit = { + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(deleteResponse.rc)) + } + + @Test + def testDeleteExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse] + assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc)) + } + + @Test + def testExistsNonExistentZNode(): Unit = { + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(existsResponse.rc)) + } + + @Test + def testExistsExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse] + assertEquals("Response code for exists should be OK", Code.OK, Code.get(existsResponse.rc)) + } + + @Test + def testGetDataNonExistentZNode(): Unit = { + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getDataResponse.rc)) + } + + @Test + def testGetDataExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val data = bytes + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse] + assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc)) + assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data) + } + + @Test + def testSetDataNonExistentZNode(): Unit = { + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setDataResponse.rc)) + } + + @Test + def testSetDataExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val data = bytes + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, data, -1, null)).asInstanceOf[SetDataResponse] + assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc)) + val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse] + assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc)) + assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data) + } + + @Test + def testGetACLNonExistentZNode(): Unit = { + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getACLResponse.rc)) + } + + @Test + def testGetACLExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse] + assertEquals("Response code for getACL should be OK", Code.OK, Code.get(getACLResponse.rc)) + assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getACLResponse.acl) + } + + @Test + def testSetACLNonExistentZNode(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val setACLResponse = zookeeperClient.handle(SetACLRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1, null)).asInstanceOf[SetACLResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setACLResponse.rc)) + } + + @Test + def testGetChildrenNonExistentZNode(): Unit = { + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] + assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getChildrenResponse.rc)) + } + + @Test + def testGetChildrenExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] + assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc)) + assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children) + } + + @Test + def testGetChildrenExistingZNodeWithChildren(): Unit = { + import scala.collection.JavaConverters._ + val child1 = "child1" + val child2 = "child2" + val child1Path = mockPath + "/" + child1 + val child2Path = mockPath + "/" + child2 + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc)) + val createResponseChild2 = zookeeperClient.handle(CreateRequest(child2Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create child2 should be OK", Code.OK, Code.get(createResponseChild2.rc)) + + val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse] + assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc)) + assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted) + } + + @Test + def testPipelinedGetData(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + val createResponses = createRequests.map(zookeeperClient.handle) + createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))) + val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x, null)) + val getDataResponses = zookeeperClient.handle(getDataRequests) + getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))) + getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) => + assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc)) + assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.asInstanceOf[GetDataResponse].data))) + } + } + + @Test + def testMixedPipeline(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + val getDataRequest = GetDataRequest(mockPath, null) + val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1, null) + val responses = zookeeperClient.handle(Seq(getDataRequest, setDataRequest)) + assertEquals("Response code for getData should be OK", Code.OK, Code.get(responses.head.rc)) + assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data) + assertEquals("Response code for setData should be NONODE", Code.NONODE, Code.get(responses.last.rc)) + } + + @Test + def testZNodeChangeHandlerForCreation(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleCreation = { + znodeChangeHandlerCountDownLatch.countDown() + } + override def handleDeletion = {} + override def handleDataChange = {} + override val path: String = mockPath + } + + zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testZNodeChangeHandlerForDeletion(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleCreation = {} + override def handleDeletion = { + znodeChangeHandlerCountDownLatch.countDown() + } + override def handleDataChange = {} + override val path: String = mockPath + } + + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse] + assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc)) + assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testZNodeChangeHandlerForDataChange(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleCreation = {} + override def handleDeletion = {} + override def handleDataChange = { + znodeChangeHandlerCountDownLatch.countDown() + } + override val path: String = mockPath + } + + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse] + assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc)) + assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testZNodeChildChangeHandlerForChildChange(): Unit = { + import scala.collection.JavaConverters._ + val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChildChangeHandler = new ZNodeChildChangeHandler { + override def handleChildChange = { + zNodeChildChangeHandlerCountDownLatch.countDown() + } + override val path: String = mockPath + } + + val child1 = "child1" + val child1Path = mockPath + "/" + child1 + val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc)) + zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler) + val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)) + assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc)) + assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testStateChangeHandlerForAuthFailure(): Unit = { + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf") + val stateChangeHandlerCountDownLatch = new CountDownLatch(1) + val stateChangeHandler = new StateChangeHandler { + override def beforeInitializingSession = {} + override def afterInitializingSession = {} + override def onAuthFailure = { + stateChangeHandlerCountDownLatch.countDown() + } + override def onConnectionTimeout = {} + } + new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler) + assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8) +}
