Repository: spark Updated Branches: refs/heads/master eea2b877c -> b9ab791a9
[SPARK-21890] Credentials not being passed to add the tokens I observed this while running a oozie job trying to connect to hbase via spark. It look like the creds are not being passed in thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53 for 2.2 release. More Info as to why it fails on secure grid: Oozie client gets the necessary tokens the application needs before launching. It passes those tokens along to the oozie launcher job (MR job) which will then actually call the Spark client to launch the spark app and pass the tokens along. The oozie launcher job cannot get anymore tokens because all it has is tokens ( you can't get tokens with tokens, you need tgt or keytab). The error here is because the launcher job runs the Spark Client to submit the spark job but the spark client doesn't see that it already has the hdfs tokens so it tries to get more, which ends with the exception. There was a change with SPARK-19021 to generalize the hdfs credentials provider that changed it so we don't pass the existing credentials into the call to get tokens so it doesn't realize it already has the necessary tokens. https://issues.apache.org/jira/browse/SPARK-21890 Modified to pass creds to get delegation tokens Author: Sanket Chintapalli <schin...@yahoo-inc.com> Closes #19140 from redsanket/SPARK-21890-master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9ab791a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9ab791a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9ab791a Branch: refs/heads/master Commit: b9ab791a9efb0dc165ba283c91acf831fa6be5d8 Parents: eea2b87 Author: Sanket Chintapalli <schin...@yahoo-inc.com> Authored: Thu Sep 7 11:25:24 2017 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Thu Sep 7 11:25:24 2017 -0500 ---------------------------------------------------------------------- .../security/HBaseDelegationTokenProvider.scala | 2 + .../security/HadoopDelegationTokenManager.scala | 2 +- .../HadoopDelegationTokenProvider.scala | 3 ++ .../HadoopFSDelegationTokenProvider.scala | 50 ++++++++++---------- .../security/HiveDelegationTokenProvider.scala | 2 + .../HadoopDelegationTokenManagerSuite.scala | 4 +- 6 files changed, 35 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index 35621da..78b0e6b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -34,6 +35,7 @@ private[security] class HBaseDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { try { val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index c317c4f..c134b7e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -116,7 +116,7 @@ private[spark] class HadoopDelegationTokenManager( creds: Credentials): Long = { delegationTokenProviders.values.flatMap { provider => if (provider.delegationTokensRequired(hadoopConf)) { - provider.obtainDelegationTokens(hadoopConf, creds) + provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) } else { logDebug(s"Service ${provider.serviceName} does not require a token." + s" Check your configuration to see if security is disabled or not.") http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala index f162e7e..1ba245e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala @@ -20,6 +20,8 @@ package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials +import org.apache.spark.SparkConf + /** * Hadoop delegation token provider. */ @@ -46,5 +48,6 @@ private[spark] trait HadoopDelegationTokenProvider { */ def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] } http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index f0ac7f5..300773c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -26,8 +26,9 @@ import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { @@ -41,21 +42,20 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { val fsToGetTokens = fileSystems(hadoopConf) - val newCreds = fetchDelegationTokens( - getTokenRenewer(hadoopConf), - fsToGetTokens) + val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens) } // Get the time of next renewal. val nextRenewalDate = tokenRenewalInterval.flatMap { interval => - val nextRenewalDates = newCreds.getAllTokens.asScala + val nextRenewalDates = fetchCreds.getAllTokens.asScala .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) .map { token => val identifier = token @@ -66,7 +66,6 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) } - creds.addAll(newCreds) nextRenewalDate } @@ -89,9 +88,8 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def fetchDelegationTokens( renewer: String, - filesystems: Set[FileSystem]): Credentials = { - - val creds = new Credentials() + filesystems: Set[FileSystem], + creds: Credentials): Credentials = { filesystems.foreach { fs => logInfo("getting token for: " + fs) @@ -103,25 +101,27 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def getTokenRenewalInterval( hadoopConf: Configuration, + sparkConf: SparkConf, filesystems: Set[FileSystem]): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - val creds = fetchDelegationTokens( - UserGroupInformation.getCurrentUser.getUserName, - filesystems) - - val renewIntervals = creds.getAllTokens.asScala.filter { - _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] - }.flatMap { token => - Try { - val newExpiration = token.renew(hadoopConf) - val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") - interval - }.toOption + sparkConf.get(PRINCIPAL).flatMap { renewer => + val creds = new Credentials() + fetchDelegationTokens(renewer, filesystems, creds) + + val renewIntervals = creds.getAllTokens.asScala.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") + interval + }.toOption + } + if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } - if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index 53b9f89..b31cc59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -61,6 +62,7 @@ private[security] class HiveDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { try { val conf = hiveConf(hadoopConf) http://git-wip-us.apache.org/repos/asf/spark/blob/b9ab791a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 5b05521..eeffc36 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -94,7 +94,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { val hiveCredentialProvider = new HiveDelegationTokenProvider() val credentials = new Credentials() - hiveCredentialProvider.obtainDelegationTokens(hadoopConf, credentials) + hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials) credentials.getAllTokens.size() should be (0) } @@ -105,7 +105,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { val hbaseTokenProvider = new HBaseDelegationTokenProvider() val creds = new Credentials() - hbaseTokenProvider.obtainDelegationTokens(hadoopConf, creds) + hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds) creds.getAllTokens.size should be (0) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org