Repository: spark Updated Branches: refs/heads/master 318bf4115 -> 5c3912e5c
[SPARK-12523][YARN] Support long-running of the Spark On HBase and hive meta store. Obtain the hive metastore and hbase token as well as hdfs token in DelegationToeknRenewer to supoort long-running application of spark on hbase or thriftserver. Author: huangzhaowei <carlmartin...@gmail.com> Closes #10645 from SaintBacchus/SPARK-12523. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c3912e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c3912e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c3912e5 Branch: refs/heads/master Commit: 5c3912e5c90ce659146c3056430d100604378b71 Parents: 318bf41 Author: huangzhaowei <carlmartin...@gmail.com> Authored: Fri Feb 26 07:32:07 2016 -0600 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Fri Feb 26 07:32:07 2016 -0600 ---------------------------------------------------------------------- .../deploy/yarn/AMDelegationTokenRenewer.scala | 2 + .../org/apache/spark/deploy/yarn/Client.scala | 42 +------------------- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 38 ++++++++++++++++++ 3 files changed, 42 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5c3912e5/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index 2ac9e33..70b67d2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -172,6 +172,8 @@ private[yarn] class AMDelegationTokenRenewer( override def run(): Void = { val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds) + hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, tempCreds) + hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds) null } }) http://git-wip-us.apache.org/repos/asf/spark/blob/5c3912e5/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 530f1d7..dac3ea2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -345,8 +345,8 @@ private[spark] class Client( // multiple times, YARN will fail to launch containers for the app with an internal // error. val distributedUris = new HashSet[String] - obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) - obtainTokenForHBase(sparkConf, hadoopConf, credentials) + YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) + YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -1358,35 +1358,6 @@ object Client extends Logging { } /** - * Obtains token for the Hive metastore and adds them to the credentials. - */ - private def obtainTokenForHiveMetastore( - sparkConf: SparkConf, - conf: Configuration, - credentials: Credentials) { - if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) { - YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach { - credentials.addToken(new Text("hive.server2.delegation.token"), _) - } - } - } - - /** - * Obtain a security token for HBase. - */ - def obtainTokenForHBase( - sparkConf: SparkConf, - conf: Configuration, - credentials: Credentials): Unit = { - if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) { - YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token => - credentials.addToken(token.getService, token) - logInfo("Added HBase security token to credentials.") - } - } - } - - /** * Return whether the two file systems are the same. */ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { @@ -1450,13 +1421,4 @@ object Client extends Logging { components.mkString(Path.SEPARATOR) } - /** - * Return whether delegation tokens should be retrieved for the given service when security is - * enabled. By default, tokens are retrieved, but that behavior can be changed by setting - * a service-specific configuration. - */ - def shouldGetTokens(conf: SparkConf, service: String): Boolean = { - conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true) - } - } http://git-wip-us.apache.org/repos/asf/spark/blob/5c3912e5/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 272f129..4c9432d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -133,6 +133,44 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } } + /** + * Obtains token for the Hive metastore and adds them to the credentials. + */ + def obtainTokenForHiveMetastore( + sparkConf: SparkConf, + conf: Configuration, + credentials: Credentials) { + if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) { + YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach { + credentials.addToken(new Text("hive.server2.delegation.token"), _) + } + } + } + + /** + * Obtain a security token for HBase. + */ + def obtainTokenForHBase( + sparkConf: SparkConf, + conf: Configuration, + credentials: Credentials): Unit = { + if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) { + YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token => + credentials.addToken(token.getService, token) + logInfo("Added HBase security token to credentials.") + } + } + } + + /** + * Return whether delegation tokens should be retrieved for the given service when security is + * enabled. By default, tokens are retrieved, but that behavior can be changed by setting + * a service-specific configuration. + */ + private def shouldGetTokens(conf: SparkConf, service: String): Boolean = { + conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true) + } + private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = { tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf)) tokenRenewer.get.updateCredentialsIfRequired() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org