This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9469831  [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for 
yarn cluster mode if principal and keytab are available
9469831 is described below

commit 9469831c3751e898ebe78cb642266b50ea167f22
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Mon May 11 17:25:41 2020 -0700

    [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster 
mode if principal and keytab are available
    
    ### What changes were proposed in this pull request?
    
    This patch re-obtain tokens at the start of AM for yarn cluster mode, if 
principal and keytab are available. It basically transfers the credentials from 
the original user, so this patch puts the new tokens into credentials from the 
original user via overwriting.
    
    To obtain tokens from providers in user application, this patch leverages 
the user classloader as context classloader while initializing token manager in 
the startup of AM.
    
    ### Why are the changes needed?
    
    Submitter will obtain delegation tokens for yarn-cluster mode, and add 
these credentials to the launch context. AM will be launched with these 
credentials, and AM and driver are able to leverage these tokens.
    
    In Yarn cluster mode, driver is launched in AM, which in turn initializes 
token manager (while initializing SparkContext) and obtain delegation tokens (+ 
schedule to renew) if both principal and keytab are available.
    
    That said, even we provide principal and keytab to run application with 
yarn-cluster mode, AM always starts with initial tokens from launch context 
until token manager runs and obtains delegation tokens.
    
    So there's a "gap", and if user codes (driver) access to external system 
with delegation tokens (e.g. HDFS) before initializing SparkContext, it cannot 
leverage the tokens token manager will obtain. It will make the application 
fail if AM is killed "after" the initial tokens are expired and relaunched.
    
    This is even a regression: see below codes in branch-2.4:
    
    
https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
    
    
https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
    
    In Spark 2.4.x, AM runs AMCredentialRenewer at initialization, and 
AMCredentialRenew obtains tokens and merge with credentials being provided with 
launch context of AM. So it guarantees new tokens in driver run.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually tested with specifically crafted application (simple reproducer) - 
https://github.com/HeartSaVioR/spark-delegation-token-experiment/blob/master/src/main/scala/net/heartsavior/spark/example/LongRunningAppWithHDFSConfig.scala
    
    Before this patch, new AM attempt failed when I killed AM after the 
expiration of tokens. After this patch the new AM attempt runs fine.
    
    Closes #28336 from HeartSaVioR/SPARK-31559.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@apache.org>
    (cherry picked from commit 842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a)
    Signed-off-by: Marcelo Vanzin <van...@apache.org>
---
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala  | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1e8f408..862acd8 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -860,10 +861,22 @@ object ApplicationMaster extends Logging {
     val ugi = sparkConf.get(PRINCIPAL) match {
       // We only need to log in with the keytab in cluster mode. In client 
mode, the driver
       // handles the user keytab.
-      case Some(principal) if amArgs.userClass != null =>
+      case Some(principal) if master.isClusterMode =>
         val originalCreds = 
UserGroupInformation.getCurrentUser().getCredentials()
         SparkHadoopUtil.get.loginUserFromKeytab(principal, 
sparkConf.get(KEYTAB).orNull)
         val newUGI = UserGroupInformation.getCurrentUser()
+
+        if (master.appAttemptId == null || master.appAttemptId.getAttemptId > 
1) {
+          // Re-obtain delegation tokens if this is not a first attempt, as 
they might be outdated
+          // as of now. Add the fresh tokens on top of the original user's 
credentials (overwrite).
+          // Set the context class loader so that the token manager has access 
to jars
+          // distributed by the user.
+          Utils.withContextClassLoader(master.userClassLoader) {
+            val credentialManager = new 
HadoopDelegationTokenManager(sparkConf, yarnConf, null)
+            credentialManager.obtainDelegationTokens(originalCreds)
+          }
+        }
+
         // Transfer the original user's tokens to the new user, since it may 
contain needed tokens
         // (such as those user to connect to YARN).
         newUGI.addCredentials(originalCreds)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to