Repository: kafka Updated Branches: refs/heads/trunk 3cfbb25c6 -> f300480f8
MINOR: Use match instead of if/else in KafkaZkClient Also use ZkVersion.NoVersion instead of -1. Author: Mickael Maison <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #4196 from mimaison/zkclient_refactor Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f300480f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f300480f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f300480f Branch: refs/heads/trunk Commit: f300480f882e8b17999e1172bfbec0f13c71eda7 Parents: 3cfbb25 Author: Mickael Maison <[email protected]> Authored: Wed Nov 15 12:50:07 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Wed Nov 15 13:37:07 2017 +0000 ---------------------------------------------------------------------- .../ZkNodeChangeNotificationListener.scala | 9 +- .../src/main/scala/kafka/zk/KafkaZkClient.scala | 385 ++++++++----------- core/src/main/scala/kafka/zk/ZkData.scala | 4 + 3 files changed, 175 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f300480f/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index f589430..f0d4b1b 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -84,11 +84,10 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient, val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { val changeZnode = seqNodeRoot + "/" + notification - val data = zkClient.getDataAndStat(changeZnode)._1.orNull - if (data != null) { - notificationHandler.processNotification(data) - } else { - logger.warn(s"read null data from $changeZnode when processing notification $notification") + val (data, _) = zkClient.getDataAndStat(changeZnode) + data match { + case Some(d) => notificationHandler.processNotification(d) + case None => logger.warn(s"read null data from $changeZnode when processing notification $notification") } lastExecutedChange = changeId } http://git-wip-us.apache.org/repos/asf/kafka/blob/f300480f/core/src/main/scala/kafka/zk/KafkaZkClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 97b7c98..a753c5b 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -133,13 +133,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends } setDataResponses.foreach { setDataResponse => val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition] - if (setDataResponse.resultCode == Code.OK) { - val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion) - successfulUpdates.put(partition, updatedLeaderAndIsr) - } else if (setDataResponse.resultCode == Code.BADVERSION) { - updatesToRetry += partition - } else { - failed.put(partition, setDataResponse.resultException.get) + setDataResponse.resultCode match { + case Code.OK => + val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion) + successfulUpdates.put(partition, updatedLeaderAndIsr) + case Code.BADVERSION => updatesToRetry += partition + case _ => failed.put(partition, setDataResponse.resultException.get) } } UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap) @@ -166,15 +165,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends } configResponses.foreach { configResponse => val topic = configResponse.ctx.get.asInstanceOf[String] - if (configResponse.resultCode == Code.OK) { - val overrides = ConfigEntityZNode.decode(configResponse.data) - val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties)) - logConfigs.put(topic, logConfig) - } else if (configResponse.resultCode == Code.NONODE) { - val logConfig = LogConfig.fromProps(config, new Properties) - logConfigs.put(topic, logConfig) - } else { - failed.put(topic, configResponse.resultException.get) + configResponse.resultCode match { + case Code.OK => + val overrides = ConfigEntityZNode.decode(configResponse.data) + val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties)) + logConfigs.put(topic, logConfig) + case Code.NONODE => + val logConfig = LogConfig.fromProps(config, new Properties) + logConfigs.put(topic, logConfig) + case _ => failed.put(topic, configResponse.resultException.get) } } (logConfigs.toMap, failed.toMap) @@ -186,24 +185,24 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends */ def getAllBrokersInCluster: Seq[Broker] = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - val brokerIds = getChildrenResponse.children.map(_.toInt) - val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId))) - val getDataResponses = retryRequestsUntilConnected(getDataRequests) - getDataResponses.flatMap { getDataResponse => - val brokerId = getDataResponse.ctx.get.asInstanceOf[Int] - if (getDataResponse.resultCode == Code.OK) { - Option(BrokerIdZNode.decode(brokerId, getDataResponse.data)) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => + val brokerIds = getChildrenResponse.children.map(_.toInt) + val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId))) + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.flatMap { getDataResponse => + val brokerId = getDataResponse.ctx.get.asInstanceOf[Int] + getDataResponse.resultCode match { + case Code.OK => + Option(BrokerIdZNode.decode(brokerId, getDataResponse.data)) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get + } } - } - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get + case Code.NONODE => + Seq.empty + case _ => + throw getChildrenResponse.resultException.get } } @@ -213,13 +212,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends */ def getAllTopicsInCluster: Seq[String] = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => getChildrenResponse.children + case Code.NONODE => Seq.empty + case _ => throw getChildrenResponse.resultException.get } + } /** @@ -239,12 +237,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends */ def getAllLogDirEventNotifications: Seq[String] = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber) - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber) + case Code.NONODE => Seq.empty + case _ => throw getChildrenResponse.resultException.get } } @@ -259,12 +255,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends } val getDataResponses = retryRequestsUntilConnected(getDataRequests) getDataResponses.flatMap { getDataResponse => - if (getDataResponse.resultCode == Code.OK) { - LogDirEventNotificationSequenceZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => LogDirEventNotificationSequenceZNode.decode(getDataResponse.data) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get } } } @@ -302,12 +296,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq) getDataResponses.flatMap { getDataResponse => val topic = getDataResponse.ctx.get.asInstanceOf[String] - if (getDataResponse.resultCode == Code.OK) { - TopicZNode.decode(topic, getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - Map.empty[TopicPartition, Seq[Int]] - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => TopicZNode.decode(topic, getDataResponse.data) + case Code.NONODE => Map.empty[TopicPartition, Seq[Int]] + case _ => throw getDataResponse.resultException.get } }.toMap } @@ -330,23 +322,14 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends * @param path zk node path * @return A tuple of 2 elements, where first element is zk node data as string * and second element is zk node version. - * returns (None, -1) if node doesn't exists and throws exception for any error + * returns (None, ZkVersion.NoVersion) if node doesn't exists and throws exception for any error */ def getDataAndVersion(path: String): (Option[String], Int) = { - val getDataRequest = GetDataRequest(path) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - - if (getDataResponse.resultCode == Code.OK) { - if (getDataResponse.data == null) - (None, getDataResponse.stat.getVersion) - else { - val data = new String(getDataResponse.data, UTF_8) - (Some(data), getDataResponse.stat.getVersion) - } - } else if (getDataResponse.resultCode == Code.NONODE) - (None, -1) - else - throw getDataResponse.resultException.get + val (data, stat) = getDataAndStat(path) + stat match { + case ZkStat.NoStat => (data, ZkVersion.NoVersion) + case _ => (data, stat.getVersion) + } } /** @@ -354,23 +337,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends * @param path zk node path * @return A tuple of 2 elements, where first element is zk node data as string * and second element is zk node stats. - * returns (None, new Stat()) if node doesn't exists and throws exception for any error + * returns (None, ZkStat.NoStat) if node doesn't exists and throws exception for any error */ def getDataAndStat(path: String): (Option[String], Stat) = { val getDataRequest = GetDataRequest(path) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - if (getDataResponse.data == null) - (None, getDataResponse.stat) - else { - val data = new String(getDataResponse.data, UTF_8) - (Some(data), getDataResponse.stat) - } - } else if (getDataResponse.resultCode == Code.NONODE) { - (None, new Stat()) - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => + if (getDataResponse.data == null) + (None, getDataResponse.stat) + else { + val data = Option(getDataResponse.data).map(new String(_, UTF_8)) + (data, getDataResponse.stat) + } + case Code.NONODE => (None, ZkStat.NoStat) + case _ => throw getDataResponse.resultException.get } } @@ -381,18 +363,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends */ def getChildren(path : String): Seq[String] = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => getChildrenResponse.children + case Code.NONODE => Seq.empty + case _ => throw getChildrenResponse.resultException.get } } /** * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't - * exist, the current version is not the expected version, etc.) return (false, -1) + * exist, the current version is not the expected version, etc.) return (false, ZkVersion.NoVersion) * * When there is a ConnectionLossException during the conditional update, ZookeeperClient will retry the update and may fail * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one). @@ -417,13 +397,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends debug("Checker method is not passed skipping zkData match") debug("Conditional update of path %s with data %s and expected version %d failed due to %s" .format(path, data, expectVersion, setDataResponse.resultException.get.getMessage)) - (false, -1) + (false, ZkVersion.NoVersion) } case Code.NONODE => debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, expectVersion, setDataResponse.resultException.get.getMessage)) - (false, -1) + (false, ZkVersion.NoVersion) case _ => debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, @@ -438,12 +418,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends */ def getTopicDeletions: Seq[String] = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => getChildrenResponse.children + case Code.NONODE => Seq.empty + case _ => throw getChildrenResponse.resultException.get } } @@ -463,12 +441,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = { val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - ReassignPartitionsZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - Map.empty[TopicPartition, Seq[Int]] - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data) + case Code.NONODE => Map.empty + case _ => throw getDataResponse.resultException.get } } @@ -512,19 +488,17 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends /** * Gets topic partition states for the given partitions. - * @param partitions the partitions for which we want ot get states. + * @param partitions the partitions for which we want to get states. * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state. */ def getTopicPartitionStates(partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch] = { val getDataResponses = getTopicPartitionStatesRaw(partitions) getDataResponses.flatMap { getDataResponse => val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition] - if (getDataResponse.resultCode == Code.OK) { - TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get } }.toMap } @@ -535,12 +509,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends */ def getAllIsrChangeNotifications: Seq[String] = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber) - } else if (getChildrenResponse.resultCode == Code.NONODE) { - Seq.empty - } else { - throw getChildrenResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber) + case Code.NONODE => Seq.empty + case _ => throw getChildrenResponse.resultException.get } } @@ -555,12 +527,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends } val getDataResponses = retryRequestsUntilConnected(getDataRequests) getDataResponses.flatMap { getDataResponse => - if (getDataResponse.resultCode == Code.OK) { - IsrChangeNotificationSequenceZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => IsrChangeNotificationSequenceZNode.decode(getDataResponse.data) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get } } } @@ -595,12 +565,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def getPreferredReplicaElection: Set[TopicPartition] = { val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - PreferredReplicaElectionZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - Set.empty[TopicPartition] - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => PreferredReplicaElectionZNode.decode(getDataResponse.data) + case Code.NONODE => Set.empty + case _ => throw getDataResponse.resultException.get } } @@ -619,12 +587,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def getControllerId: Option[Int] = { val getDataRequest = GetDataRequest(ControllerZNode.path) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - ControllerZNode.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => ControllerZNode.decode(getDataResponse.data) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get } } @@ -643,13 +609,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def getControllerEpoch: Option[(Int, Stat)] = { val getDataRequest = GetDataRequest(ControllerEpochZNode.path) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - val epoch = ControllerEpochZNode.decode(getDataResponse.data) - Option(epoch, getDataResponse.stat) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => + val epoch = ControllerEpochZNode.decode(getDataResponse.data) + Option(epoch, getDataResponse.stat) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get } } @@ -689,12 +654,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def getVersionedAclsForResource(resource: Resource): VersionedAcls = { val getDataRequest = GetDataRequest(ResourceZNode.path(resource)) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - ResourceZNode.decode(getDataResponse.data, getDataResponse.stat) - } else if (getDataResponse.resultCode == Code.NONODE) { - VersionedAcls(Set(), -1) - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => ResourceZNode.decode(getDataResponse.data, getDataResponse.stat) + case Code.NONODE => VersionedAcls(Set(), -1) + case _ => throw getDataResponse.resultException.get } } @@ -722,23 +685,17 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends val setDataResponse = set(aclData, expectedVersion) setDataResponse.resultCode match { - case Code.OK => - (true, setDataResponse.stat.getVersion) + case Code.OK => (true, setDataResponse.stat.getVersion) case Code.NONODE => { val createResponse = create(aclData) createResponse.resultCode match { - case Code.OK => - (true, 0) - case Code.NODEEXISTS => - (false, 0) - case _ => - throw createResponse.resultException.get + case Code.OK => (true, 0) + case Code.NODEEXISTS => (false, 0) + case _ => throw createResponse.resultException.get } } - case Code.BADVERSION => - (false, 0) - case _ => - throw setDataResponse.resultException.get + case Code.BADVERSION => (false, 0) + case _ => throw setDataResponse.resultException.get } } @@ -750,9 +707,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends val path = AclChangeNotificationSequenceZNode.createPath val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resourceName), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) val createResponse = retryRequestUntilConnected(createRequest) - if (createResponse.resultCode != Code.OK) { - throw createResponse.resultException.get - } + createResponse.resultException.foreach(e => throw e) } /** @@ -829,12 +784,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def conditionalDelete(resource: Resource, expectedVersion: Int): Boolean = { val deleteRequest = DeleteRequest(ResourceZNode.path(resource), expectedVersion) val deleteResponse = retryRequestUntilConnected(deleteRequest) - if (deleteResponse.resultCode == Code.OK || deleteResponse.resultCode == Code.NONODE) { - true - } else if (deleteResponse.resultCode == Code.BADVERSION) { - false - } else { - throw deleteResponse.resultException.get + deleteResponse.resultCode match { + case Code.OK | Code.NONODE => true + case Code.BADVERSION => false + case _ => throw deleteResponse.resultException.get } } @@ -929,12 +882,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends def getConsumerOffset(group: String, topicPartition: TopicPartition): Option[Long] = { val getDataRequest = GetDataRequest(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - ConsumerOffset.decode(getDataResponse.data) - } else if (getDataResponse.resultCode == Code.NONODE) { - None - } else { - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => ConsumerOffset.decode(getDataResponse.data) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get } } @@ -948,16 +899,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends val setDataResponse = setConsumerOffset(group, topicPartition, offset) if (setDataResponse.resultCode == Code.NONODE) { val createResponse = createConsumerOffset(group, topicPartition, offset) - if (createResponse.resultCode != Code.OK) { - throw createResponse.resultException.get - } - } else if (setDataResponse.resultCode != Code.OK) { - throw setDataResponse.resultException.get + createResponse.resultException.foreach(e => throw e) + } else { + setDataResponse.resultException.foreach(e => throw e) } } private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): SetDataResponse = { - val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition), ConsumerOffset.encode(offset), ZkVersion.NoVersion) + val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition), + ConsumerOffset.encode(offset), ZkVersion.NoVersion) retryRequestUntilConnected(setDataRequest) } @@ -977,33 +927,32 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends /** * Deletes the given zk path recursively * @param path - * @return true if path gets deleted successfully, false if root path doesn't exists + * @return true if path gets deleted successfully, false if root path doesn't exist * @throws KeeperException if there is an error while deleting the znodes */ private[zk] def deleteRecursive(path: String): Boolean = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) - if (getChildrenResponse.resultCode == Code.OK) { - getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child")) - val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.NoVersion)) - if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) { - throw deleteResponse.resultException.get - } - true - } else if (getChildrenResponse.resultCode == Code.NONODE) { - false - } else - throw getChildrenResponse.resultException.get + getChildrenResponse.resultCode match { + case Code.OK => + getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child")) + val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.NoVersion)) + if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) { + throw deleteResponse.resultException.get + } + true + case Code.NONODE => false + case _ => throw getChildrenResponse.resultException.get + } } private[zk] def pathExists(path: String): Boolean = { val getDataRequest = GetDataRequest(path) val getDataResponse = retryRequestUntilConnected(getDataRequest) - if (getDataResponse.resultCode == Code.OK) { - true - } else if (getDataResponse.resultCode == Code.NONODE) { - false - } else - throw getDataResponse.resultException.get + getDataResponse.resultCode match { + case Code.OK => true + case Code.NONODE => false + case _ => throw getDataResponse.resultException.get + } } private[zk] def createRecursive(path: String): Unit = { @@ -1099,13 +1048,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL) val createResponse = retryRequestUntilConnected(createRequest) val code = createResponse.resultCode - if (code == Code.OK) { - code - } else if (code == Code.NODEEXISTS) { - get() - } else { - error(s"Error while creating ephemeral at $path with return code: $code") - code + code match { + case Code.OK => code + case Code.NODEEXISTS => get() + case _ => + error(s"Error while creating ephemeral at $path with return code: $code") + code } } @@ -1113,19 +1061,20 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends val getDataRequest = GetDataRequest(path) val getDataResponse = retryRequestUntilConnected(getDataRequest) val code = getDataResponse.resultCode - if (code == Code.OK) { - if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) { + code match { + case Code.OK => + if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) { + error(s"Error while creating ephemeral at $path with return code: $code") + Code.NODEEXISTS + } else { + code + } + case Code.NONODE => + info(s"The ephemeral node at $path went away while reading it") + create() + case _ => error(s"Error while creating ephemeral at $path with return code: $code") - Code.NODEEXISTS - } else { code - } - } else if (code == Code.NONODE) { - info(s"The ephemeral node at $path went away while reading it") - create() - } else { - error(s"Error while creating ephemeral at $path with return code: $code") - code } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f300480f/core/src/main/scala/kafka/zk/ZkData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 400f0c7..4c618a0 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -255,6 +255,10 @@ object ZkVersion { val NoVersion = -1 } +object ZkStat { + val NoStat = new Stat() +} + object StateChangeHandlers { val ControllerHandler = "controller-state-change-handler" def zkNodeChangeListenerHandler(seqNodeRoot: String) = s"change-notification-$seqNodeRoot"
