http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ZookeeperClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala deleted file mode 100644 index 0009439..0000000 --- a/core/src/main/scala/kafka/controller/ZookeeperClient.scala +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.controller - -import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit} - -import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} -import kafka.utils.Logging -import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} -import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} -import org.apache.zookeeper.ZooKeeper.States -import org.apache.zookeeper.data.{ACL, Stat} -import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper} - -import scala.collection.JavaConverters._ - -/** - * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. - * - * @param connectString comma separated host:port pairs, each corresponding to a zk server - * @param sessionTimeoutMs session timeout in milliseconds - * @param connectionTimeoutMs connection timeout in milliseconds - * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread. - */ -class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, - stateChangeHandler: StateChangeHandler) extends Logging { - this.logIdent = "[ZookeeperClient] " - private val initializationLock = new ReentrantReadWriteLock() - private val isConnectedOrExpiredLock = new ReentrantLock() - private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition() - private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala - private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala - - info(s"Initializing a new session to $connectString.") - @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher) - waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) - - /** - * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details. - * - * @param request a single request to send and wait on. - * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse). - */ - def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = { - handleRequests(Seq(request)).head - } - - /** - * Send a pipelined sequence of requests and wait for all of their responses. - * - * The watch flag on each outgoing request will be set if we've already registered a handler for the - * path associated with the request. - * - * @param requests a sequence of requests to send and wait on. - * @return the responses for the requests. If all requests have the same type, the responses will have the respective - * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype - * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]). - */ - def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) { - if (requests.isEmpty) - Seq.empty - else { - val countDownLatch = new CountDownLatch(requests.size) - val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) - - requests.foreach { request => - send(request) { response => - responseQueue.add(response) - countDownLatch.countDown() - } - } - countDownLatch.await() - responseQueue.asScala.toBuffer - } - } - - private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { - // Safe to cast as we always create a response of the right type - def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response]) - - request match { - case ExistsRequest(path, ctx) => - zooKeeper.exists(path, shouldWatch(request), new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = - callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat)) - }, ctx.orNull) - case GetDataRequest(path, ctx) => - zooKeeper.getData(path, shouldWatch(request), new DataCallback { - override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit = - callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat)) - }, ctx.orNull) - case GetChildrenRequest(path, ctx) => - zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback { - override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit = - callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), - Option(children).map(_.asScala).getOrElse(Seq.empty), stat)) - }, ctx.orNull) - case CreateRequest(path, data, acl, createMode, ctx) => - zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { - override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit = - callback(CreateResponse(Code.get(rc), path, Option(ctx), name)) - }, ctx.orNull) - case SetDataRequest(path, data, version, ctx) => - zooKeeper.setData(path, data, version, new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = - callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat)) - }, ctx.orNull) - case DeleteRequest(path, version, ctx) => - zooKeeper.delete(path, version, new VoidCallback { - override def processResult(rc: Int, path: String, ctx: Any): Unit = - callback(DeleteResponse(Code.get(rc), path, Option(ctx))) - }, ctx.orNull) - case GetAclRequest(path, ctx) => - zooKeeper.getACL(path, null, new ACLCallback { - override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = { - callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty), - stat)) - }}, ctx.orNull) - case SetAclRequest(path, acl, version, ctx) => - zooKeeper.setACL(path, acl.asJava, version, new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = - callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat)) - }, ctx.orNull) - } - } - - /** - * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state. - * @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection. - * @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection. - */ - def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) { - waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS) - } - - private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = { - info("Waiting until connected.") - var nanos = timeUnit.toNanos(timeout) - inLock(isConnectedOrExpiredLock) { - var state = zooKeeper.getState - while (!state.isConnected && state.isAlive) { - if (nanos <= 0) { - throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state") - } - nanos = isConnectedOrExpiredCondition.awaitNanos(nanos) - state = zooKeeper.getState - } - if (state == States.AUTH_FAILED) { - throw new ZookeeperClientAuthFailedException("Auth failed either before or while waiting for connection") - } else if (state == States.CLOSED) { - throw new ZookeeperClientExpiredException("Session expired either before or while waiting for connection") - } - } - info("Connected.") - } - - // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler - // may need to be updated. - private def shouldWatch(request: AsyncRequest): Boolean = request match { - case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path) - case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path) - case _ => throw new IllegalArgumentException(s"Request $request is not watchable") - } - - /** - * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher. - * - * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) - * with either a GetDataRequest or ExistsRequest. - * - * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest. - * - * @param zNodeChangeHandler the handler to register - */ - def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = { - zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler) - } - - /** - * Unregister the handler from ZookeeperClient. This is just a local operation. - * @param path the path of the handler to unregister - */ - def unregisterZNodeChangeHandler(path: String): Unit = { - zNodeChangeHandlers.remove(path) - } - - /** - * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher. - * - * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest. - * - * @param zNodeChildChangeHandler the handler to register - */ - def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = { - zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler) - } - - /** - * Unregister the handler from ZookeeperClient. This is just a local operation. - * @param path the path of the handler to unregister - */ - def unregisterZNodeChildChangeHandler(path: String): Unit = { - zNodeChildChangeHandlers.remove(path) - } - - def close(): Unit = inWriteLock(initializationLock) { - info("Closing.") - zNodeChangeHandlers.clear() - zNodeChildChangeHandlers.clear() - zooKeeper.close() - info("Closed.") - } - - def sessionId: Long = inReadLock(initializationLock) { - zooKeeper.getSessionId - } - - private def initialize(): Unit = { - if (!zooKeeper.getState.isAlive) { - info(s"Initializing a new session to $connectString.") - var now = System.currentTimeMillis() - val threshold = now + connectionTimeoutMs - while (now < threshold) { - try { - zooKeeper.close() - zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher) - waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS) - return - } catch { - case _: Exception => - now = System.currentTimeMillis() - if (now < threshold) { - Thread.sleep(1000) - now = System.currentTimeMillis() - } - } - } - info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}") - stateChangeHandler.onReconnectionTimeout() - } - } - - private object ZookeeperClientWatcher extends Watcher { - override def process(event: WatchedEvent): Unit = { - debug("Received event: " + event) - Option(event.getPath) match { - case None => - inLock(isConnectedOrExpiredLock) { - isConnectedOrExpiredCondition.signalAll() - } - if (event.getState == KeeperState.AuthFailed) { - info("Auth failed.") - stateChangeHandler.onAuthFailure() - } else if (event.getState == KeeperState.Expired) { - inWriteLock(initializationLock) { - info("Session expired.") - stateChangeHandler.beforeInitializingSession() - initialize() - stateChangeHandler.afterInitializingSession() - } - } - case Some(path) => - (event.getType: @unchecked) match { - case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange()) - case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation()) - case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion()) - case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange()) - } - } - } - } -} - -trait StateChangeHandler { - def beforeInitializingSession(): Unit = {} - def afterInitializingSession(): Unit = {} - def onAuthFailure(): Unit = {} - def onReconnectionTimeout(): Unit = {} -} - -trait ZNodeChangeHandler { - val path: String - def handleCreation(): Unit = {} - def handleDeletion(): Unit = {} - def handleDataChange(): Unit = {} -} - -trait ZNodeChildChangeHandler { - val path: String - def handleChildChange(): Unit = {} -} - -sealed trait AsyncRequest { - /** - * This type member allows us to define methods that take requests and return responses with the correct types. - * See ``ZookeeperClient.handleRequests`` for example. - */ - type Response <: AsyncResponse - def path: String - def ctx: Option[Any] -} - -case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, - ctx: Option[Any] = None) extends AsyncRequest { - type Response = CreateResponse -} - -case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest { - type Response = DeleteResponse -} - -case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = ExistsResponse -} - -case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = GetDataResponse -} - -case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest { - type Response = SetDataResponse -} - -case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = GetAclResponse -} - -case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest { - type Response = SetAclResponse -} - -case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = GetChildrenResponse -} - -sealed trait AsyncResponse { - def resultCode: Code - def path: String - def ctx: Option[Any] - - /** Return None if the result code is OK and KeeperException otherwise. */ - def resultException: Option[KeeperException] = - if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path)) -} -case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse -case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse -case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse -case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse -case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse -case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse -case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse -case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse - -class ZookeeperClientException(message: String) extends RuntimeException(message) -class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message) -class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message) -class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bd7f023..f1e2fc2 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,14 +23,14 @@ import java.util.concurrent._ import com.yammer.metrics.core.Gauge import kafka.common.KafkaException -import kafka.controller.KafkaControllerZkUtils import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _} import kafka.utils._ +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time -import org.apache.kafka.common.errors.{LogDirNotFoundException, KafkaStorageException} +import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException} import scala.collection.JavaConverters._ import scala.collection._ @@ -102,7 +102,6 @@ class LogManager(logDirs: Seq[File], loadLogs() - // public, so we can access this from kafka.admin.DeleteTopicTest val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) @@ -888,7 +887,7 @@ object LogManager { def apply(config: KafkaConfig, initialOfflineDirs: Seq[String], - zkUtils: KafkaControllerZkUtils, + zkClient: KafkaZkClient, brokerState: BrokerState, kafkaScheduler: KafkaScheduler, time: Time, @@ -897,7 +896,7 @@ object LogManager { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) - val (topicConfigs, failed) = zkUtils.getLogConfigs(zkUtils.getAllTopicsInCluster, defaultProps) + val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps) if (!failed.isEmpty) throw failed.head._2 // read the log configurations from zookeeper http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 101e646..d576206 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -27,7 +27,7 @@ import com.yammer.metrics.core.Gauge import kafka.api.KAFKA_0_9_0 import kafka.cluster.Broker import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException} -import kafka.controller.{KafkaController, KafkaControllerZkUtils, StateChangeHandler, ZookeeperClient} +import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.{LogConfig, LogManager} @@ -36,6 +36,8 @@ import kafka.network.SocketServer import kafka.security.CredentialProvider import kafka.security.auth.Authorizer import kafka.utils._ +import kafka.zk.KafkaZkClient +import kafka.zookeeper.{StateChangeHandler, ZooKeeperClient} import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} @@ -135,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var quotaManagers: QuotaFactory.QuotaManagers = null var zkUtils: ZkUtils = null - var kafkaControllerZkUtils: KafkaControllerZkUtils = null + private var zkClient: KafkaZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap @@ -219,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) - val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs, + val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, new StateChangeHandler { override def onReconnectionTimeout(): Unit = { error("Reconnection timeout.") @@ -233,10 +235,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP override def beforeInitializingSession(): Unit = kafkaController.expire() }) - kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure) + zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure) /* start log manager */ - logManager = LogManager(config, initialOfflineDirs, kafkaControllerZkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) + logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) logManager.startup() metadataCache = new MetadataCache(config.brokerId) @@ -250,7 +252,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP replicaManager.startup() /* start kafka controller */ - kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix) + kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix) kafkaController.startup() adminManager = new AdminManager(config, metrics, metadataCache, zkUtils) @@ -561,8 +563,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP CoreUtils.swallow(kafkaController.shutdown()) if (zkUtils != null) CoreUtils.swallow(zkUtils.close()) - if (kafkaControllerZkUtils != null) - CoreUtils.swallow(kafkaControllerZkUtils.close()) + if (zkClient != null) + CoreUtils.swallow(zkClient.close()) if (metrics != null) CoreUtils.swallow(metrics.close()) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/utils/Json.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index a916875..ad40c49 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -16,9 +16,12 @@ */ package kafka.utils +import java.nio.charset.StandardCharsets + import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import kafka.utils.json.JsonValue + import scala.collection._ /** @@ -36,6 +39,13 @@ object Json { catch { case _: JsonProcessingException => None } /** + * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON. + */ + def parseBytes(input: Array[Byte]): Option[JsonValue] = + try Option(mapper.readTree(input)).map(JsonValue(_)) + catch { case _: JsonProcessingException => None } + + /** * Encode an object into a JSON string. This method accepts any type T where * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T] * Any other type will result in an exception. @@ -59,4 +69,13 @@ object Json { } } + /** + * Encode an object into a JSON value in bytes. This method accepts any type T where + * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T] + * Any other type will result in an exception. + * + * This method does not properly handle non-ascii characters. + */ + def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8) + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index cc38667..60c2adf 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -252,6 +252,9 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time) } } +/** + * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead. + */ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, val zkConnection: ZkConnection, val isSecure: Boolean) extends Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zk/KafkaZkClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala new file mode 100644 index 0000000..0e48d51 --- /dev/null +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.zk + +import java.util.Properties + +import kafka.api.LeaderAndIsr +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.log.LogConfig +import kafka.server.ConfigType +import kafka.utils._ +import kafka.zookeeper._ +import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.{CreateMode, KeeperException} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]]. + * + * This performs better than [[kafka.utils.ZkUtils]] and should replace it completely, eventually. + * + * Implementation note: this class includes methods for various components (Controller, Configs, Old Consumer, etc.) + * and returns instances of classes from the calling packages in some cases. This is not ideal, but it makes it + * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are + * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go. + */ +class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging { + import KafkaZkClient._ + + /** + * Gets topic partition states for the given partitions. + * @param partitions the partitions for which we want ot get states. + * @return sequence of GetDataResponses whose contexts are the partitions they are associated with. + */ + def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = { + val getDataRequests = partitions.map { partition => + GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition)) + } + retryRequestsUntilConnected(getDataRequests) + } + + /** + * Sets topic partition states for the given partitions. + * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set. + * @return sequence of SetDataResponse whose contexts are the partitions they are associated with. + */ + def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = { + val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => + val path = TopicPartitionStateZNode.path(partition) + val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) + SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition)) + } + retryRequestsUntilConnected(setDataRequests.toSeq) + } + + /** + * Creates topic partition state znodes for the given partitions. + * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set. + * @return sequence of CreateResponse whose contexts are the partitions they are associated with. + */ + def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = { + createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq) + createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq) + val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => + val path = TopicPartitionStateZNode.path(partition) + val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) + CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition)) + } + retryRequestsUntilConnected(createRequests.toSeq) + } + + /** + * Sets the controller epoch conditioned on the given epochZkVersion. + * @param epoch the epoch to set + * @param epochZkVersion the expected version number of the epoch znode. + * @return SetDataResponse + */ + def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = { + val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion) + retryRequestUntilConnected(setDataRequest) + } + + /** + * Creates the controller epoch znode. + * @param epoch the epoch to set + * @return CreateResponse + */ + def createControllerEpochRaw(epoch: Int): CreateResponse = { + val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), + acls(ControllerEpochZNode.path), CreateMode.PERSISTENT) + retryRequestUntilConnected(createRequest) + } + + /** + * Try to update the partition states of multiple partitions in zookeeper. + * @param leaderAndIsrs The partition states to update. + * @param controllerEpoch The current controller epoch. + * @return UpdateLeaderAndIsrResult instance containing per partition results. + */ + def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = { + val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr] + val updatesToRetry = mutable.Buffer.empty[TopicAndPartition] + val failed = mutable.Map.empty[TopicAndPartition, Exception] + val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) } + val setDataResponses = try { + setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) + } catch { + case e: Exception => + leaderAndIsrs.keys.foreach(partition => failed.put(partition, e)) + return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap) + } + setDataResponses.foreach { setDataResponse => + val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition] + if (setDataResponse.resultCode == Code.OK) { + val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion) + successfulUpdates.put(partition, updatedLeaderAndIsr) + } else if (setDataResponse.resultCode == Code.BADVERSION) { + updatesToRetry += partition + } else { + failed.put(partition, setDataResponse.resultException.get) + } + } + UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap) + } + + /** + * Get log configs that merge local configs with topic-level configs in zookeeper. + * @param topics The topics to get log configs for. + * @param config The local configs. + * @return A tuple of two values: + * 1. The successfully gathered log configs + * 2. Exceptions corresponding to failed log config lookups. + */ + def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]): + (Map[String, LogConfig], Map[String, Exception]) = { + val logConfigs = mutable.Map.empty[String, LogConfig] + val failed = mutable.Map.empty[String, Exception] + val configResponses = try { + getTopicConfigs(topics) + } catch { + case e: Exception => + topics.foreach(topic => failed.put(topic, e)) + return (logConfigs.toMap, failed.toMap) + } + configResponses.foreach { configResponse => + val topic = configResponse.ctx.get.asInstanceOf[String] + if (configResponse.resultCode == Code.OK) { + val overrides = ConfigEntityZNode.decode(configResponse.data) + val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties)) + logConfigs.put(topic, logConfig) + } else if (configResponse.resultCode == Code.NONODE) { + val logConfig = LogConfig.fromProps(config, new Properties) + logConfigs.put(topic, logConfig) + } else { + failed.put(topic, configResponse.resultException.get) + } + } + (logConfigs.toMap, failed.toMap) + } + + /** + * Gets all brokers in the cluster. + * @return sequence of brokers in the cluster. + */ + def getAllBrokersInCluster: Seq[Broker] = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + val brokerIds = getChildrenResponse.children.map(_.toInt) + val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId))) + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.flatMap { getDataResponse => + val brokerId = getDataResponse.ctx.get.asInstanceOf[Int] + if (getDataResponse.resultCode == Code.OK) { + Option(BrokerIdZNode.decode(brokerId, getDataResponse.data)) + } else if (getDataResponse.resultCode == Code.NONODE) { + None + } else { + throw getDataResponse.resultException.get + } + } + } else if (getChildrenResponse.resultCode == Code.NONODE) { + Seq.empty + } else { + throw getChildrenResponse.resultException.get + } + } + + /** + * Gets all topics in the cluster. + * @return sequence of topics in the cluster. + */ + def getAllTopicsInCluster: Seq[String] = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + getChildrenResponse.children + } else if (getChildrenResponse.resultCode == Code.NONODE) { + Seq.empty + } else { + throw getChildrenResponse.resultException.get + } + } + + /** + * Sets the topic znode with the given assignment. + * @param topic the topic whose assignment is being set. + * @param assignment the partition to replica mapping to set for the given topic + * @return SetDataResponse + */ + def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = { + val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1) + retryRequestUntilConnected(setDataRequest) + } + + /** + * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path. + * @return sequence of znode names and not the absolute znode path. + */ + def getAllLogDirEventNotifications: Seq[String] = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber) + } else if (getChildrenResponse.resultCode == Code.NONODE) { + Seq.empty + } else { + throw getChildrenResponse.resultException.get + } + } + + /** + * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids. + * @param sequenceNumbers the sequence numbers associated with the log dir event notifications. + * @return broker ids associated with the given log dir event notifications. + */ + def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = { + val getDataRequests = sequenceNumbers.map { sequenceNumber => + GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber)) + } + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.flatMap { getDataResponse => + if (getDataResponse.resultCode == Code.OK) { + LogDirEventNotificationSequenceZNode.decode(getDataResponse.data) + } else if (getDataResponse.resultCode == Code.NONODE) { + None + } else { + throw getDataResponse.resultException.get + } + } + } + + /** + * Deletes all log dir event notifications. + */ + def deleteLogDirEventNotifications(): Unit = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + deleteLogDirEventNotifications(getChildrenResponse.children) + } else if (getChildrenResponse.resultCode != Code.NONODE) { + throw getChildrenResponse.resultException.get + } + } + + /** + * Deletes the log dir event notifications associated with the given sequence numbers. + * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted. + */ + def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = { + val deleteRequests = sequenceNumbers.map { sequenceNumber => + DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1) + } + retryRequestsUntilConnected(deleteRequests) + } + + /** + * Gets the assignments for the given topics. + * @param topics the topics whose partitions we wish to get the assignments for. + * @return the replica assignment for each partition from the given topics. + */ + def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = { + val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic))) + val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq) + getDataResponses.flatMap { getDataResponse => + val topic = getDataResponse.ctx.get.asInstanceOf[String] + if (getDataResponse.resultCode == Code.OK) { + TopicZNode.decode(topic, getDataResponse.data) + } else if (getDataResponse.resultCode == Code.NONODE) { + Map.empty[TopicAndPartition, Seq[Int]] + } else { + throw getDataResponse.resultException.get + } + }.toMap + } + + /** + * Get all topics marked for deletion. + * @return sequence of topics marked for deletion. + */ + def getTopicDeletions: Seq[String] = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + getChildrenResponse.children + } else if (getChildrenResponse.resultCode == Code.NONODE) { + Seq.empty + } else { + throw getChildrenResponse.resultException.get + } + } + + /** + * Remove the given topics from the topics marked for deletion. + * @param topics the topics to remove. + */ + def deleteTopicDeletions(topics: Seq[String]): Unit = { + val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1)) + retryRequestsUntilConnected(deleteRequests) + } + + /** + * Returns all reassignments. + * @return the reassignments for each partition. + */ + def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = { + val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + if (getDataResponse.resultCode == Code.OK) { + ReassignPartitionsZNode.decode(getDataResponse.data) + } else if (getDataResponse.resultCode == Code.NONODE) { + Map.empty[TopicAndPartition, Seq[Int]] + } else { + throw getDataResponse.resultException.get + } + } + + /** + * Sets the partition reassignment znode with the given reassignment. + * @param reassignment the reassignment to set on the reassignment znode. + * @return SetDataResponse + */ + def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = { + val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1) + retryRequestUntilConnected(setDataRequest) + } + + /** + * Creates the partition reassignment znode with the given reassignment. + * @param reassignment the reassignment to set on the reassignment znode. + * @return CreateResponse + */ + def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = { + val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), + acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT) + retryRequestUntilConnected(createRequest) + } + + /** + * Deletes the partition reassignment znode. + */ + def deletePartitionReassignment(): Unit = { + val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1) + retryRequestUntilConnected(deleteRequest) + } + + /** + * Gets topic partition states for the given partitions. + * @param partitions the partitions for which we want ot get states. + * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state. + */ + def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { + val getDataResponses = getTopicPartitionStatesRaw(partitions) + getDataResponses.flatMap { getDataResponse => + val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition] + if (getDataResponse.resultCode == Code.OK) { + TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _) + } else if (getDataResponse.resultCode == Code.NONODE) { + None + } else { + throw getDataResponse.resultException.get + } + }.toMap + } + + /** + * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path. + * @return sequence of znode names and not the absolute znode path. + */ + def getAllIsrChangeNotifications: Seq[String] = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber) + } else if (getChildrenResponse.resultCode == Code.NONODE) { + Seq.empty + } else { + throw getChildrenResponse.resultException.get + } + } + + /** + * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions. + * @param sequenceNumbers the sequence numbers associated with the isr change notifications. + * @return partitions associated with the given isr change notifications. + */ + def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = { + val getDataRequests = sequenceNumbers.map { sequenceNumber => + GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber)) + } + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.flatMap { getDataResponse => + if (getDataResponse.resultCode == Code.OK) { + IsrChangeNotificationSequenceZNode.decode(getDataResponse.data) + } else if (getDataResponse.resultCode == Code.NONODE) { + None + } else { + throw getDataResponse.resultException.get + } + } + } + + /** + * Deletes all isr change notifications. + */ + def deleteIsrChangeNotifications(): Unit = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path)) + if (getChildrenResponse.resultCode == Code.OK) { + deleteIsrChangeNotifications(getChildrenResponse.children) + } else if (getChildrenResponse.resultCode != Code.NONODE) { + throw getChildrenResponse.resultException.get + } + } + + /** + * Deletes the isr change notifications associated with the given sequence numbers. + * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted. + */ + def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = { + val deleteRequests = sequenceNumbers.map { sequenceNumber => + DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1) + } + retryRequestsUntilConnected(deleteRequests) + } + + /** + * Gets the partitions marked for preferred replica election. + * @return sequence of partitions. + */ + def getPreferredReplicaElection: Set[TopicAndPartition] = { + val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + if (getDataResponse.resultCode == Code.OK) { + PreferredReplicaElectionZNode.decode(getDataResponse.data) + } else if (getDataResponse.resultCode == Code.NONODE) { + Set.empty[TopicAndPartition] + } else { + throw getDataResponse.resultException.get + } + } + + /** + * Deletes the preferred replica election znode. + */ + def deletePreferredReplicaElection(): Unit = { + val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1) + retryRequestUntilConnected(deleteRequest) + } + + /** + * Gets the controller id. + * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise. + */ + def getControllerId: Option[Int] = { + val getDataRequest = GetDataRequest(ControllerZNode.path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + if (getDataResponse.resultCode == Code.OK) { + ControllerZNode.decode(getDataResponse.data) + } else if (getDataResponse.resultCode == Code.NONODE) { + None + } else { + throw getDataResponse.resultException.get + } + } + + /** + * Deletes the controller znode. + */ + def deleteController(): Unit = { + val deleteRequest = DeleteRequest(ControllerZNode.path, -1) + retryRequestUntilConnected(deleteRequest) + } + + /** + * Gets the controller epoch. + * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise. + */ + def getControllerEpoch: Option[(Int, Stat)] = { + val getDataRequest = GetDataRequest(ControllerEpochZNode.path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + if (getDataResponse.resultCode == Code.OK) { + val epoch = ControllerEpochZNode.decode(getDataResponse.data) + Option(epoch, getDataResponse.stat) + } else if (getDataResponse.resultCode == Code.NONODE) { + None + } else { + throw getDataResponse.resultException.get + } + } + + /** + * Recursively deletes the topic znode. + * @param topic the topic whose topic znode we wish to delete. + */ + def deleteTopicZNode(topic: String): Unit = { + deleteRecursive(TopicZNode.path(topic)) + } + + /** + * Deletes the topic configs for the given topics. + * @param topics the topics whose configs we wish to delete. + */ + def deleteTopicConfigs(topics: Seq[String]): Unit = { + val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1)) + retryRequestsUntilConnected(deleteRequests) + } + + /** + * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher + * registrations on paths which might not even exist. + * + * @param zNodeChangeHandler + */ + def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = { + zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path)) + if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) { + throw existsResponse.resultException.get + } + } + + /** + * See ZooKeeperClient.registerZNodeChangeHandler + * @param zNodeChangeHandler + */ + def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = { + zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + } + + /** + * See ZooKeeperClient.unregisterZNodeChangeHandler + * @param path + */ + def unregisterZNodeChangeHandler(path: String): Unit = { + zooKeeperClient.unregisterZNodeChangeHandler(path) + } + + /** + * See ZooKeeperClient.registerZNodeChildChangeHandler + * @param zNodeChildChangeHandler + */ + def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = { + zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler) + } + + /** + * See ZooKeeperClient.unregisterZNodeChildChangeHandler + * @param path + */ + def unregisterZNodeChildChangeHandler(path: String): Unit = { + zooKeeperClient.unregisterZNodeChildChangeHandler(path) + } + + /** + * Close the underlying ZooKeeperClient. + */ + def close(): Unit = { + zooKeeperClient.close() + } + + private def deleteRecursive(path: String): Unit = { + val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) + if (getChildrenResponse.resultCode == Code.OK) { + getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child")) + val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1)) + if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) { + throw deleteResponse.resultException.get + } + } else if (getChildrenResponse.resultCode != Code.NONODE) { + throw getChildrenResponse.resultException.get + } + } + private def createTopicPartition(partitions: Seq[TopicAndPartition]) = { + val createRequests = partitions.map { partition => + val path = TopicPartitionZNode.path(partition) + CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition)) + } + retryRequestsUntilConnected(createRequests) + } + + private def createTopicPartitions(topics: Seq[String]) = { + val createRequests = topics.map { topic => + val path = TopicPartitionsZNode.path(topic) + CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic)) + } + retryRequestsUntilConnected(createRequests) + } + + private def getTopicConfigs(topics: Seq[String]) = { + val getDataRequests = topics.map { topic => + GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic)) + } + retryRequestsUntilConnected(getDataRequests) + } + + private def acls(path: String) = { + import scala.collection.JavaConverters._ + ZkUtils.defaultAcls(isSecure, path).asScala + } + + private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { + retryRequestsUntilConnected(Seq(request)).head + } + + private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { + val remainingRequests = ArrayBuffer(requests: _*) + val responses = new ArrayBuffer[Req#Response] + while (remainingRequests.nonEmpty) { + val batchResponses = zooKeeperClient.handleRequests(remainingRequests) + + // Only execute slow path if we find a response with CONNECTIONLOSS + if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) { + val requestResponsePairs = remainingRequests.zip(batchResponses) + + remainingRequests.clear() + requestResponsePairs.foreach { case (request, response) => + if (response.resultCode == Code.CONNECTIONLOSS) + remainingRequests += request + else + responses += response + } + + if (remainingRequests.nonEmpty) + zooKeeperClient.waitUntilConnected() + } else { + remainingRequests.clear() + responses ++= batchResponses + } + } + responses + } + + def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = { + val checkedEphemeral = new CheckedEphemeral(path, data) + info(s"Creating $path (is it secure? $isSecure)") + val code = checkedEphemeral.create() + info(s"Result of znode creation at $path is: $code") + code match { + case Code.OK => + case _ => throw KeeperException.create(code) + } + } + + private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging { + def create(): Code = { + val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL) + val createResponse = retryRequestUntilConnected(createRequest) + val code = createResponse.resultCode + if (code == Code.OK) { + code + } else if (code == Code.NODEEXISTS) { + get() + } else { + error(s"Error while creating ephemeral at $path with return code: $code") + code + } + } + + private def get(): Code = { + val getDataRequest = GetDataRequest(path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + val code = getDataResponse.resultCode + if (code == Code.OK) { + if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) { + error(s"Error while creating ephemeral at $path with return code: $code") + Code.NODEEXISTS + } else { + code + } + } else if (code == Code.NONODE) { + info(s"The ephemeral node at $path went away while reading it") + create() + } else { + error(s"Error while creating ephemeral at $path with return code: $code") + code + } + } + } +} + +object KafkaZkClient { + + /** + * @param successfulPartitions The successfully updated partition states with adjusted znode versions. + * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts + * can occur if the partition leader updated partition state while the controller attempted to + * update partition state. + * @param failedPartitions Exceptions corresponding to failed partition state updates. + */ + case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr], + partitionsToRetry: Seq[TopicAndPartition], + failedPartitions: Map[TopicAndPartition, Exception]) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zk/ZkData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala new file mode 100644 index 0000000..292523c --- /dev/null +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.zk + +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Properties + +import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} +import kafka.cluster.{Broker, EndPoint} +import kafka.common.TopicAndPartition +import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch} +import kafka.utils.Json +import org.apache.zookeeper.data.Stat + +import scala.collection.Seq + +// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). + +object ControllerZNode { + def path = "/controller" + def encode(brokerId: Int, timestamp: Long): Array[Byte] = + Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)) + def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => + js.asJsonObject("brokerid").to[Int] + } +} + +object ControllerEpochZNode { + def path = "/controller_epoch" + def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8) + def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt +} + +object ConfigZNode { + def path = "/config" +} + +object BrokersZNode { + def path = "/brokers" +} + +object BrokerIdsZNode { + def path = s"${BrokersZNode.path}/ids" + def encode: Array[Byte] = null +} + +object BrokerIdZNode { + def path(id: Int) = s"${BrokerIdsZNode.path}/$id" + def encode(id: Int, + host: String, + port: Int, + advertisedEndpoints: Seq[EndPoint], + jmxPort: Int, + rack: Option[String], + apiVersion: ApiVersion): Array[Byte] = { + val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 + Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes(UTF_8) + } + + def decode(id: Int, bytes: Array[Byte]): Broker = { + Broker.createBroker(id, new String(bytes, UTF_8)) + } +} + +object TopicsZNode { + def path = s"${BrokersZNode.path}/topics" +} + +object TopicZNode { + def path(topic: String) = s"${TopicsZNode.path}/$topic" + def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = { + val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas } + Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson)) + } + def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = { + Json.parseBytes(bytes).flatMap { js => + val assignmentJson = js.asJsonObject + val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject) + partitionsJsonOpt.map { partitionsJson => + partitionsJson.iterator.map { case (partition, replicas) => + TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]] + } + } + }.map(_.toMap).getOrElse(Map.empty) + } +} + +object TopicPartitionsZNode { + def path(topic: String) = s"${TopicZNode.path(topic)}/partitions" +} + +object TopicPartitionZNode { + def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}" +} + +object TopicPartitionStateZNode { + def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state" + def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = { + val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch + Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, + "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) + } + def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = { + Json.parseBytes(bytes).map { js => + val leaderIsrAndEpochInfo = js.asJsonObject + val leader = leaderIsrAndEpochInfo("leader").to[Int] + val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int] + val isr = leaderIsrAndEpochInfo("isr").to[List[Int]] + val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int] + val zkPathVersion = stat.getVersion + LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch) + } + } +} + +object ConfigEntityTypeZNode { + def path(entityType: String) = s"${ConfigZNode.path}/$entityType" +} + +object ConfigEntityZNode { + def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName" + def encode(config: Properties): Array[Byte] = { + import scala.collection.JavaConverters._ + Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala)) + } + def decode(bytes: Array[Byte]): Option[Properties] = { + Json.parseBytes(bytes).map { js => + val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption)) + val props = new Properties() + configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) }) + props + } + } +} + +object IsrChangeNotificationZNode { + def path = "/isr_change_notification" +} + +object IsrChangeNotificationSequenceZNode { + val SequenceNumberPrefix = "isr_change_" + def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber" + def encode(partitions: Set[TopicAndPartition]): Array[Byte] = { + val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition)) + Json.encodeAsBytes(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson)) + } + + def decode(bytes: Array[Byte]): Set[TopicAndPartition] = { + Json.parseBytes(bytes).map { js => + val partitionsJson = js.asJsonObject("partitions").asJsonArray + partitionsJson.iterator.map { partitionsJson => + val partitionJson = partitionsJson.asJsonObject + val topic = partitionJson("topic").to[String] + val partition = partitionJson("partition").to[Int] + TopicAndPartition(topic, partition) + } + } + }.map(_.toSet).getOrElse(Set.empty) + def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length) +} + +object LogDirEventNotificationZNode { + def path = "/log_dir_event_notification" +} + +object LogDirEventNotificationSequenceZNode { + val SequenceNumberPrefix = "log_dir_event_" + val LogDirFailureEvent = 1 + def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber" + def encode(brokerId: Int) = + Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)) + def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => + js.asJsonObject("broker").to[Int] + } + def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length) +} + +object AdminZNode { + def path = "/admin" +} + +object DeleteTopicsZNode { + def path = s"${AdminZNode.path}/delete_topics" +} + +object DeleteTopicsTopicZNode { + def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic" +} + +object ReassignPartitionsZNode { + def path = s"${AdminZNode.path}/reassign_partitions" + def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = { + val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) => + Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas) + } + Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson)) + } + def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js => + val reassignmentJson = js.asJsonObject + val partitionsJsonOpt = reassignmentJson.get("partitions") + partitionsJsonOpt.map { partitionsJson => + partitionsJson.asJsonArray.iterator.map { partitionFieldsJs => + val partitionFields = partitionFieldsJs.asJsonObject + val topic = partitionFields("topic").to[String] + val partition = partitionFields("partition").to[Int] + val replicas = partitionFields("replicas").to[Seq[Int]] + TopicAndPartition(topic, partition) -> replicas + } + } + }.map(_.toMap).getOrElse(Map.empty) +} + +object PreferredReplicaElectionZNode { + def path = s"${AdminZNode.path}/preferred_replica_election" + def encode(partitions: Set[TopicAndPartition]): Array[Byte] = { + val jsonMap = Map("version" -> 1, + "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))) + Json.encodeAsBytes(jsonMap) + } + def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseBytes(bytes).map { js => + val partitionsJson = js.asJsonObject("partitions").asJsonArray + partitionsJson.iterator.map { partitionsJson => + val partitionJson = partitionsJson.asJsonObject + val topic = partitionJson("topic").to[String] + val partition = partitionJson("partition").to[Int] + TopicAndPartition(topic, partition) + } + }.map(_.toSet).getOrElse(Set.empty) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala new file mode 100644 index 0000000..0ff34c0 --- /dev/null +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zookeeper + +import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit} + +import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} +import kafka.utils.Logging +import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} +import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} +import org.apache.zookeeper.ZooKeeper.States +import org.apache.zookeeper.data.{ACL, Stat} +import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper} + +import scala.collection.JavaConverters._ + +/** + * A ZooKeeper client that encourages pipelined requests. + * + * @param connectString comma separated host:port pairs, each corresponding to a zk server + * @param sessionTimeoutMs session timeout in milliseconds + * @param connectionTimeoutMs connection timeout in milliseconds + * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread. + */ +class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, + stateChangeHandler: StateChangeHandler) extends Logging { + this.logIdent = "[ZooKeeperClient] " + private val initializationLock = new ReentrantReadWriteLock() + private val isConnectedOrExpiredLock = new ReentrantLock() + private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition() + private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala + private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala + + info(s"Initializing a new session to $connectString.") + @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) + waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) + + /** + * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details. + * + * @param request a single request to send and wait on. + * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse). + */ + def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = { + handleRequests(Seq(request)).head + } + + /** + * Send a pipelined sequence of requests and wait for all of their responses. + * + * The watch flag on each outgoing request will be set if we've already registered a handler for the + * path associated with the request. + * + * @param requests a sequence of requests to send and wait on. + * @return the responses for the requests. If all requests have the same type, the responses will have the respective + * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype + * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]). + */ + def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) { + if (requests.isEmpty) + Seq.empty + else { + val countDownLatch = new CountDownLatch(requests.size) + val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) + + requests.foreach { request => + send(request) { response => + responseQueue.add(response) + countDownLatch.countDown() + } + } + countDownLatch.await() + responseQueue.asScala.toBuffer + } + } + + private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { + // Safe to cast as we always create a response of the right type + def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response]) + + request match { + case ExistsRequest(path, ctx) => + zooKeeper.exists(path, shouldWatch(request), new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat)) + }, ctx.orNull) + case GetDataRequest(path, ctx) => + zooKeeper.getData(path, shouldWatch(request), new DataCallback { + override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit = + callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat)) + }, ctx.orNull) + case GetChildrenRequest(path, ctx) => + zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback { + override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit = + callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), + Option(children).map(_.asScala).getOrElse(Seq.empty), stat)) + }, ctx.orNull) + case CreateRequest(path, data, acl, createMode, ctx) => + zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { + override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit = + callback(CreateResponse(Code.get(rc), path, Option(ctx), name)) + }, ctx.orNull) + case SetDataRequest(path, data, version, ctx) => + zooKeeper.setData(path, data, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat)) + }, ctx.orNull) + case DeleteRequest(path, version, ctx) => + zooKeeper.delete(path, version, new VoidCallback { + override def processResult(rc: Int, path: String, ctx: Any): Unit = + callback(DeleteResponse(Code.get(rc), path, Option(ctx))) + }, ctx.orNull) + case GetAclRequest(path, ctx) => + zooKeeper.getACL(path, null, new ACLCallback { + override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = { + callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty), + stat)) + }}, ctx.orNull) + case SetAclRequest(path, acl, version, ctx) => + zooKeeper.setACL(path, acl.asJava, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat)) + }, ctx.orNull) + } + } + + /** + * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state. + * @throws ZooKeeperClientAuthFailedException if the authentication failed either before or while waiting for connection. + * @throws ZooKeeperClientExpiredException if the session expired either before or while waiting for connection. + */ + def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) { + waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS) + } + + private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = { + info("Waiting until connected.") + var nanos = timeUnit.toNanos(timeout) + inLock(isConnectedOrExpiredLock) { + var state = zooKeeper.getState + while (!state.isConnected && state.isAlive) { + if (nanos <= 0) { + throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state") + } + nanos = isConnectedOrExpiredCondition.awaitNanos(nanos) + state = zooKeeper.getState + } + if (state == States.AUTH_FAILED) { + throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection") + } else if (state == States.CLOSED) { + throw new ZooKeeperClientExpiredException("Session expired either before or while waiting for connection") + } + } + info("Connected.") + } + + // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler + // may need to be updated. + private def shouldWatch(request: AsyncRequest): Boolean = request match { + case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path) + case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path) + case _ => throw new IllegalArgumentException(s"Request $request is not watchable") + } + + /** + * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher. + * + * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) + * with either a GetDataRequest or ExistsRequest. + * + * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest. + * + * @param zNodeChangeHandler the handler to register + */ + def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = { + zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler) + } + + /** + * Unregister the handler from ZooKeeperClient. This is just a local operation. + * @param path the path of the handler to unregister + */ + def unregisterZNodeChangeHandler(path: String): Unit = { + zNodeChangeHandlers.remove(path) + } + + /** + * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher. + * + * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest. + * + * @param zNodeChildChangeHandler the handler to register + */ + def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = { + zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler) + } + + /** + * Unregister the handler from ZooKeeperClient. This is just a local operation. + * @param path the path of the handler to unregister + */ + def unregisterZNodeChildChangeHandler(path: String): Unit = { + zNodeChildChangeHandlers.remove(path) + } + + def close(): Unit = inWriteLock(initializationLock) { + info("Closing.") + zNodeChangeHandlers.clear() + zNodeChildChangeHandlers.clear() + zooKeeper.close() + info("Closed.") + } + + def sessionId: Long = inReadLock(initializationLock) { + zooKeeper.getSessionId + } + + private def initialize(): Unit = { + if (!zooKeeper.getState.isAlive) { + info(s"Initializing a new session to $connectString.") + var now = System.currentTimeMillis() + val threshold = now + connectionTimeoutMs + while (now < threshold) { + try { + zooKeeper.close() + zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) + waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS) + return + } catch { + case _: Exception => + now = System.currentTimeMillis() + if (now < threshold) { + Thread.sleep(1000) + now = System.currentTimeMillis() + } + } + } + info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}") + stateChangeHandler.onReconnectionTimeout() + } + } + + private object ZooKeeperClientWatcher extends Watcher { + override def process(event: WatchedEvent): Unit = { + debug("Received event: " + event) + Option(event.getPath) match { + case None => + inLock(isConnectedOrExpiredLock) { + isConnectedOrExpiredCondition.signalAll() + } + if (event.getState == KeeperState.AuthFailed) { + info("Auth failed.") + stateChangeHandler.onAuthFailure() + } else if (event.getState == KeeperState.Expired) { + inWriteLock(initializationLock) { + info("Session expired.") + stateChangeHandler.beforeInitializingSession() + initialize() + stateChangeHandler.afterInitializingSession() + } + } + case Some(path) => + (event.getType: @unchecked) match { + case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange()) + case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation()) + case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion()) + case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange()) + } + } + } + } +} + +trait StateChangeHandler { + def beforeInitializingSession(): Unit = {} + def afterInitializingSession(): Unit = {} + def onAuthFailure(): Unit = {} + def onReconnectionTimeout(): Unit = {} +} + +trait ZNodeChangeHandler { + val path: String + def handleCreation(): Unit = {} + def handleDeletion(): Unit = {} + def handleDataChange(): Unit = {} +} + +trait ZNodeChildChangeHandler { + val path: String + def handleChildChange(): Unit = {} +} + +sealed trait AsyncRequest { + /** + * This type member allows us to define methods that take requests and return responses with the correct types. + * See ``ZooKeeperClient.handleRequests`` for example. + */ + type Response <: AsyncResponse + def path: String + def ctx: Option[Any] +} + +case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, + ctx: Option[Any] = None) extends AsyncRequest { + type Response = CreateResponse +} + +case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest { + type Response = DeleteResponse +} + +case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = ExistsResponse +} + +case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetDataResponse +} + +case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest { + type Response = SetDataResponse +} + +case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetAclResponse +} + +case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest { + type Response = SetAclResponse +} + +case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { + type Response = GetChildrenResponse +} + +sealed trait AsyncResponse { + def resultCode: Code + def path: String + def ctx: Option[Any] + + /** Return None if the result code is OK and KeeperException otherwise. */ + def resultException: Option[KeeperException] = + if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path)) +} +case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse +case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse +case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse +case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse +case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse +case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse +case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse +case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse + +class ZooKeeperClientException(message: String) extends RuntimeException(message) +class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message) +class ZooKeeperClientAuthFailedException(message: String) extends ZooKeeperClientException(message) +class ZooKeeperClientTimeoutException(message: String) extends ZooKeeperClientException(message) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index ea306a8..99ddcc3 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -47,7 +47,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { /** * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths - * when zookeeper.set.acl=false, even if Zookeeper is SASL-enabled. + * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled. */ @Test def testZkAclsDisabled() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 1ca5500..06ddd66 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -221,7 +221,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { } catch { case _: UnknownTopicOrPartitionException => // expected exception } - // verify delete topic path for test2 is removed from zookeeper + // verify delete topic path for test2 is removed from ZooKeeper TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers) // verify that topic test is untouched TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index a646ced..7fc2436 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -53,7 +53,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { /* * There is no easy way to test purging. Even if we mock kafka time with MockTime, the purging compares kafka time - * with the time stored in zookeeper stat and the embedded zookeeper server does not provide a way to mock time. + * with the time stored in ZooKeeper stat and the embedded ZooKeeper server does not provide a way to mock time. * So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check * Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even that the assertion * can fail as the second node can be deleted depending on how threads get scheduled.
