Repository: spark
Updated Branches:
  refs/heads/master 318bf4115 -> 5c3912e5c


[SPARK-12523][YARN] Support long-running of the Spark On HBase and hive meta 
store.

Obtain the hive metastore and hbase token as well as hdfs token in 
DelegationToeknRenewer to supoort long-running application of spark on hbase or 
thriftserver.

Author: huangzhaowei <carlmartin...@gmail.com>

Closes #10645 from SaintBacchus/SPARK-12523.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c3912e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c3912e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c3912e5

Branch: refs/heads/master
Commit: 5c3912e5c90ce659146c3056430d100604378b71
Parents: 318bf41
Author: huangzhaowei <carlmartin...@gmail.com>
Authored: Fri Feb 26 07:32:07 2016 -0600
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Fri Feb 26 07:32:07 2016 -0600

----------------------------------------------------------------------
 .../deploy/yarn/AMDelegationTokenRenewer.scala  |  2 +
 .../org/apache/spark/deploy/yarn/Client.scala   | 42 +-------------------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 38 ++++++++++++++++++
 3 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c3912e5/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 2ac9e33..70b67d2 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -172,6 +172,8 @@ private[yarn] class AMDelegationTokenRenewer(
       override def run(): Void = {
         val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
         hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
+        hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, 
tempCreds)
+        hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds)
         null
       }
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/5c3912e5/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 530f1d7..dac3ea2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -345,8 +345,8 @@ private[spark] class Client(
     // multiple times, YARN will fail to launch containers for the app with an 
internal
     // error.
     val distributedUris = new HashSet[String]
-    obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
-    obtainTokenForHBase(sparkConf, hadoopConf, credentials)
+    YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, 
credentials)
+    YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, 
credentials)
 
     val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
       fs.getDefaultReplication(dst)).toShort
@@ -1358,35 +1358,6 @@ object Client extends Logging {
   }
 
   /**
-   * Obtains token for the Hive metastore and adds them to the credentials.
-   */
-  private def obtainTokenForHiveMetastore(
-      sparkConf: SparkConf,
-      conf: Configuration,
-      credentials: Credentials) {
-    if (shouldGetTokens(sparkConf, "hive") && 
UserGroupInformation.isSecurityEnabled) {
-      YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
-        credentials.addToken(new Text("hive.server2.delegation.token"), _)
-      }
-    }
-  }
-
-  /**
-   * Obtain a security token for HBase.
-   */
-  def obtainTokenForHBase(
-      sparkConf: SparkConf,
-      conf: Configuration,
-      credentials: Credentials): Unit = {
-    if (shouldGetTokens(sparkConf, "hbase") && 
UserGroupInformation.isSecurityEnabled) {
-      YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
-        credentials.addToken(token.getService, token)
-        logInfo("Added HBase security token to credentials.")
-      }
-    }
-  }
-
-  /**
    * Return whether the two file systems are the same.
    */
   private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
@@ -1450,13 +1421,4 @@ object Client extends Logging {
     components.mkString(Path.SEPARATOR)
   }
 
-  /**
-   * Return whether delegation tokens should be retrieved for the given 
service when security is
-   * enabled. By default, tokens are retrieved, but that behavior can be 
changed by setting
-   * a service-specific configuration.
-   */
-  def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
-    conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5c3912e5/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 272f129..4c9432d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -133,6 +133,44 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     }
   }
 
+  /**
+    * Obtains token for the Hive metastore and adds them to the credentials.
+    */
+  def obtainTokenForHiveMetastore(
+      sparkConf: SparkConf,
+      conf: Configuration,
+      credentials: Credentials) {
+    if (shouldGetTokens(sparkConf, "hive") && 
UserGroupInformation.isSecurityEnabled) {
+      YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
+        credentials.addToken(new Text("hive.server2.delegation.token"), _)
+      }
+    }
+  }
+
+  /**
+    * Obtain a security token for HBase.
+    */
+  def obtainTokenForHBase(
+      sparkConf: SparkConf,
+      conf: Configuration,
+      credentials: Credentials): Unit = {
+    if (shouldGetTokens(sparkConf, "hbase") && 
UserGroupInformation.isSecurityEnabled) {
+      YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
+        credentials.addToken(token.getService, token)
+        logInfo("Added HBase security token to credentials.")
+      }
+    }
+  }
+
+  /**
+    * Return whether delegation tokens should be retrieved for the given 
service when security is
+    * enabled. By default, tokens are retrieved, but that behavior can be 
changed by setting
+    * a service-specific configuration.
+    */
+  private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
+    conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
+  }
+
   private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: 
SparkConf): Unit = {
     tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
     tokenRenewer.get.updateCredentialsIfRequired()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to