This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9469831 [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available 9469831 is described below commit 9469831c3751e898ebe78cb642266b50ea167f22 Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Mon May 11 17:25:41 2020 -0700 [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available ### What changes were proposed in this pull request? This patch re-obtain tokens at the start of AM for yarn cluster mode, if principal and keytab are available. It basically transfers the credentials from the original user, so this patch puts the new tokens into credentials from the original user via overwriting. To obtain tokens from providers in user application, this patch leverages the user classloader as context classloader while initializing token manager in the startup of AM. ### Why are the changes needed? Submitter will obtain delegation tokens for yarn-cluster mode, and add these credentials to the launch context. AM will be launched with these credentials, and AM and driver are able to leverage these tokens. In Yarn cluster mode, driver is launched in AM, which in turn initializes token manager (while initializing SparkContext) and obtain delegation tokens (+ schedule to renew) if both principal and keytab are available. That said, even we provide principal and keytab to run application with yarn-cluster mode, AM always starts with initial tokens from launch context until token manager runs and obtains delegation tokens. So there's a "gap", and if user codes (driver) access to external system with delegation tokens (e.g. HDFS) before initializing SparkContext, it cannot leverage the tokens token manager will obtain. It will make the application fail if AM is killed "after" the initial tokens are expired and relaunched. This is even a regression: see below codes in branch-2.4: https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala In Spark 2.4.x, AM runs AMCredentialRenewer at initialization, and AMCredentialRenew obtains tokens and merge with credentials being provided with launch context of AM. So it guarantees new tokens in driver run. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually tested with specifically crafted application (simple reproducer) - https://github.com/HeartSaVioR/spark-delegation-token-experiment/blob/master/src/main/scala/net/heartsavior/spark/example/LongRunningAppWithHDFSConfig.scala Before this patch, new AM attempt failed when I killed AM after the expiration of tokens. After this patch the new AM attempt runs fine. Closes #28336 from HeartSaVioR/SPARK-31559. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@apache.org> (cherry picked from commit 842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a) Signed-off-by: Marcelo Vanzin <van...@apache.org> --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1e8f408..862acd8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -860,10 +861,22 @@ object ApplicationMaster extends Logging { val ugi = sparkConf.get(PRINCIPAL) match { // We only need to log in with the keytab in cluster mode. In client mode, the driver // handles the user keytab. - case Some(principal) if amArgs.userClass != null => + case Some(principal) if master.isClusterMode => val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull) val newUGI = UserGroupInformation.getCurrentUser() + + if (master.appAttemptId == null || master.appAttemptId.getAttemptId > 1) { + // Re-obtain delegation tokens if this is not a first attempt, as they might be outdated + // as of now. Add the fresh tokens on top of the original user's credentials (overwrite). + // Set the context class loader so that the token manager has access to jars + // distributed by the user. + Utils.withContextClassLoader(master.userClassLoader) { + val credentialManager = new HadoopDelegationTokenManager(sparkConf, yarnConf, null) + credentialManager.obtainDelegationTokens(originalCreds) + } + } + // Transfer the original user's tokens to the new user, since it may contain needed tokens // (such as those user to connect to YARN). newUGI.addCredentials(originalCreds) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org