This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 006c2dca6d87 [SPARK-46977][CORE] A failed request to obtain a token from one NameNode should not skip subsequent token requests 006c2dca6d87 is described below commit 006c2dca6d87e29a69e30124e8320c275859d148 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Mon Feb 5 12:18:20 2024 -0800 [SPARK-46977][CORE] A failed request to obtain a token from one NameNode should not skip subsequent token requests ### What changes were proposed in this pull request? This PR enhances the `HadoopFSDelegationTokenProvider` to tolerate failures when fetching tokens from multiple NameNodes. ### Why are the changes needed? Let's say we are going to access 3 HDFS, `nn-1`, `nn-2`, `nn-3` in YARN cluster mode with TGT cache, while the `nn-1` is the `defaultFs` which is used by YARN to store aggregated logs, and there are issues in `nn-2` which can not issue the token. ``` spark-submit \ --master yarn \ --deployMode cluster \ --conf spark.kerberos.access.hadoopFileSystems=hdfs://nn-1,hdfs://nn-2,hdfs://nn-3 \ ... ``` During the submitting phase, Spark is going to call `HadoopFSDelegationTokenProvider` to fetch tokens from all declared NameNodes one by one, in **indeterminate** order (`HadoopFSDelegationTokenProvider.hadoopFSsToAccess` process and return a `Set[FileSystem]`), so the order may not respect the user declared order in `spark.kerberos.access.hadoopFileSystems`. If the order is [`nn-1`, `nn-2`, `nn-3`], then we are going to request a token from `nn-1` successfully, but fail for `nn-2` with the below error, the left `nn-3` is going to be skipped. But such failure WON'T block the whole submitting process, the Spark app is going to be submitted with only `nn-1` token. ``` 2024-01-03 12:41:36 [WARN] [main] org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider#94 - Failed to get token from service hadoopfs org.apache.hadoop.ipc.RemoteException: <Some Error Message> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507) ~[hadoop-common-2.9.2.2.jar:?] ... at org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2604) ~[hadoop-hdfs-client-2.9.2.2.jar:?] at org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.$anonfun$fetchDelegationTokens$1(HadoopFSDelegationTokenProvider.scala:122) ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27] at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:335) ~[scala-library-2.12.15.jar:?] at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111) ~[scala-library-2.12.15.jar:?] at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111) ~[scala-library-2.12.15.jar:?] at org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.fetchDelegationTokens(HadoopFSDelegationTokenProvider.scala:115) ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27] ... at org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.scala:146) ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27] at org.apache.spark.deploy.yarn.Client.setupSecurityToken(Client.scala:352) ~[spark-yarn_2.12-3.3.1.27.jar:3.3.1.27] at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:1140) ~[spark-yarn_2.12-3.3.1.27.jar:3.3.1.27] ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27] ``` when the Spark app access `nn-2` and `nn-3`, it will fail with `o.a.h.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]` Things become worse if the FS order is [`nn-3`, `nn-2`, `nn-1`], the Spark app will be submitted to YARN with only `nn-3` token, it even has no chance to allow NodeManager to upload aggregated logs after the application exit because it requires the app to provide a token to access `nn-1`. the log from NodeManager ``` 2024-01-03 08:08:14,028 [3173570620] - WARN [NM ContainerManager dispatcher:Client$Connection1$772] - Exception encountered while connecting to the server Caused by: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] at org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:179) at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:392) ... at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1768) ... at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.createAppDir(LogAggregationService.java:404) at org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initAppAggregator(LogAggregationService.java:273) ... ``` Without the logs, we even don't know what happened. Eventually, due to the **indeterminate** order of NameNodes to request tokens, such a Job sometimes submitted successfully and sometimes failed without logs. <img width="1903" alt="image" src="https://github.com/apache/spark/assets/26535726/7ca5c871-6399-4eae-b689-d6d741c1c373"> ### Does this PR introduce _any_ user-facing change? Yes, when the user configures `spark.kerberos.access.hadoopFileSystems` to access multiple Kerberized HDFS, and one or more NameNodes have issues, tokens are always fetched from the rest health NameNodes after this patch. ### How was this patch tested? Tested in internal Kerberized cluster. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45030 from pan3793/SPARK-46977. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/deploy/security/HadoopFSDelegationTokenProvider.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 9242fe82d249..8eb45238b477 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -31,6 +31,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.security.HadoopDelegationTokenProvider +import org.apache.spark.util.Utils private[deploy] class HadoopFSDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { @@ -116,10 +117,10 @@ private[deploy] class HadoopFSDelegationTokenProvider if (fsToExclude.contains(fs.getUri.getHost)) { // YARN RM skips renewing token with empty renewer logInfo(s"getting token for: $fs with empty renewer to skip renewal") - fs.addDelegationTokens("", creds) + Utils.tryLogNonFatalError { fs.addDelegationTokens("", creds) } } else { logInfo(s"getting token for: $fs with renewer $renewer") - fs.addDelegationTokens(renewer, creds) + Utils.tryLogNonFatalError { fs.addDelegationTokens(renewer, creds) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org