Repository: spark
Updated Branches:
  refs/heads/master eea2b877c -> b9ab791a9


[SPARK-21890] Credentials not being passed to add the tokens

I observed this while running a oozie job trying to connect to hbase via spark.
It look like the creds are not being passed in 
thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53
 for 2.2 release.
More Info as to why it fails on secure grid:
Oozie client gets the necessary tokens the application needs before launching. 
It passes those tokens along to the oozie launcher job (MR job) which will then 
actually call the Spark client to launch the spark app and pass the tokens 
along.
The oozie launcher job cannot get anymore tokens because all it has is tokens ( 
you can't get tokens with tokens, you need tgt or keytab).
The error here is because the launcher job runs the Spark Client to submit the 
spark job but the spark client doesn't see that it already has the hdfs tokens 
so it tries to get more, which ends with the exception.
There was a change with SPARK-19021 to generalize the hdfs credentials provider 
that changed it so we don't pass the existing credentials into the call to get 
tokens so it doesn't realize it already has the necessary tokens.

https://issues.apache.org/jira/browse/SPARK-21890
Modified to pass creds to get delegation tokens

Author: Sanket Chintapalli <schin...@yahoo-inc.com>

Closes #19140 from redsanket/SPARK-21890-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9ab791a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9ab791a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9ab791a

Branch: refs/heads/master
Commit: b9ab791a9efb0dc165ba283c91acf831fa6be5d8
Parents: eea2b87
Author: Sanket Chintapalli <schin...@yahoo-inc.com>
Authored: Thu Sep 7 11:25:24 2017 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Thu Sep 7 11:25:24 2017 -0500

----------------------------------------------------------------------
 .../security/HBaseDelegationTokenProvider.scala |  2 +
 .../security/HadoopDelegationTokenManager.scala |  2 +-
 .../HadoopDelegationTokenProvider.scala         |  3 ++
 .../HadoopFSDelegationTokenProvider.scala       | 50 ++++++++++----------
 .../security/HiveDelegationTokenProvider.scala  |  2 +
 .../HadoopDelegationTokenManagerSuite.scala     |  4 +-
 6 files changed, 35 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
index 35621da..78b0e6b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
@@ -34,6 +35,7 @@ private[security] class HBaseDelegationTokenProvider
 
   override def obtainDelegationTokens(
       hadoopConf: Configuration,
+      sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
     try {
       val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index c317c4f..c134b7e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -116,7 +116,7 @@ private[spark] class HadoopDelegationTokenManager(
       creds: Credentials): Long = {
     delegationTokenProviders.values.flatMap { provider =>
       if (provider.delegationTokensRequired(hadoopConf)) {
-        provider.obtainDelegationTokens(hadoopConf, creds)
+        provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
       } else {
         logDebug(s"Service ${provider.serviceName} does not require a token." +
           s" Check your configuration to see if security is disabled or not.")

http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
index f162e7e..1ba245e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
@@ -20,6 +20,8 @@ package org.apache.spark.deploy.security
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
 
+import org.apache.spark.SparkConf
+
 /**
  * Hadoop delegation token provider.
  */
@@ -46,5 +48,6 @@ private[spark] trait HadoopDelegationTokenProvider {
    */
   def obtainDelegationTokens(
     hadoopConf: Configuration,
+    sparkConf: SparkConf,
     creds: Credentials): Option[Long]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index f0ac7f5..300773c 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -26,8 +26,9 @@ import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 
 private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: 
Configuration => Set[FileSystem])
     extends HadoopDelegationTokenProvider with Logging {
@@ -41,21 +42,20 @@ private[deploy] class 
HadoopFSDelegationTokenProvider(fileSystems: Configuration
 
   override def obtainDelegationTokens(
       hadoopConf: Configuration,
+      sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
 
     val fsToGetTokens = fileSystems(hadoopConf)
-    val newCreds = fetchDelegationTokens(
-      getTokenRenewer(hadoopConf),
-      fsToGetTokens)
+    val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), 
fsToGetTokens, creds)
 
     // Get the token renewal interval if it is not set. It will only be called 
once.
     if (tokenRenewalInterval == null) {
-      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens)
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, 
fsToGetTokens)
     }
 
     // Get the time of next renewal.
     val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
-      val nextRenewalDates = newCreds.getAllTokens.asScala
+      val nextRenewalDates = fetchCreds.getAllTokens.asScala
         
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
         .map { token =>
           val identifier = token
@@ -66,7 +66,6 @@ private[deploy] class 
HadoopFSDelegationTokenProvider(fileSystems: Configuration
       if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
     }
 
-    creds.addAll(newCreds)
     nextRenewalDate
   }
 
@@ -89,9 +88,8 @@ private[deploy] class 
HadoopFSDelegationTokenProvider(fileSystems: Configuration
 
   private def fetchDelegationTokens(
       renewer: String,
-      filesystems: Set[FileSystem]): Credentials = {
-
-    val creds = new Credentials()
+      filesystems: Set[FileSystem],
+      creds: Credentials): Credentials = {
 
     filesystems.foreach { fs =>
       logInfo("getting token for: " + fs)
@@ -103,25 +101,27 @@ private[deploy] class 
HadoopFSDelegationTokenProvider(fileSystems: Configuration
 
   private def getTokenRenewalInterval(
       hadoopConf: Configuration,
+      sparkConf: SparkConf,
       filesystems: Set[FileSystem]): Option[Long] = {
     // We cannot use the tokens generated with renewer yarn. Trying to renew
     // those will fail with an access control issue. So create new tokens with 
the logged in
     // user as renewer.
-    val creds = fetchDelegationTokens(
-      UserGroupInformation.getCurrentUser.getUserName,
-      filesystems)
-
-    val renewIntervals = creds.getAllTokens.asScala.filter {
-      _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
-    }.flatMap { token =>
-      Try {
-        val newExpiration = token.renew(hadoopConf)
-        val identifier = 
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
-        val interval = newExpiration - identifier.getIssueDate
-        logInfo(s"Renewal interval is $interval for token 
${token.getKind.toString}")
-        interval
-      }.toOption
+    sparkConf.get(PRINCIPAL).flatMap { renewer =>
+      val creds = new Credentials()
+      fetchDelegationTokens(renewer, filesystems, creds)
+
+      val renewIntervals = creds.getAllTokens.asScala.filter {
+        _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+      }.flatMap { token =>
+        Try {
+          val newExpiration = token.renew(hadoopConf)
+          val identifier = 
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+          val interval = newExpiration - identifier.getIssueDate
+          logInfo(s"Renewal interval is $interval for token 
${token.getKind.toString}")
+          interval
+        }.toOption
+      }
+      if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
     }
-    if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
index 53b9f89..b31cc59 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.Token
 
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
@@ -61,6 +62,7 @@ private[security] class HiveDelegationTokenProvider
 
   override def obtainDelegationTokens(
       hadoopConf: Configuration,
+      sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
     try {
       val conf = hiveConf(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 5b05521..eeffc36 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -94,7 +94,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
with Matchers {
 
     val hiveCredentialProvider = new HiveDelegationTokenProvider()
     val credentials = new Credentials()
-    hiveCredentialProvider.obtainDelegationTokens(hadoopConf, credentials)
+    hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, 
credentials)
 
     credentials.getAllTokens.size() should be (0)
   }
@@ -105,7 +105,7 @@ class HadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers {
 
     val hbaseTokenProvider = new HBaseDelegationTokenProvider()
     val creds = new Credentials()
-    hbaseTokenProvider.obtainDelegationTokens(hadoopConf, creds)
+    hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
 
     creds.getAllTokens.size should be (0)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to