This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 006c2dca6d87 [SPARK-46977][CORE] A failed request to obtain a token 
from one NameNode should not skip subsequent token requests
006c2dca6d87 is described below

commit 006c2dca6d87e29a69e30124e8320c275859d148
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Mon Feb 5 12:18:20 2024 -0800

    [SPARK-46977][CORE] A failed request to obtain a token from one NameNode 
should not skip subsequent token requests
    
    ### What changes were proposed in this pull request?
    
    This PR enhances the `HadoopFSDelegationTokenProvider` to tolerate failures 
when fetching tokens from multiple NameNodes.
    
    ### Why are the changes needed?
    
    Let's say we are going to access 3 HDFS, `nn-1`, `nn-2`, `nn-3` in YARN 
cluster mode with TGT cache, while the `nn-1` is the `defaultFs` which is used 
by YARN to store aggregated logs, and there are issues in `nn-2` which can not 
issue the token.
    
    ```
    spark-submit \
      --master yarn \
      --deployMode cluster \
      --conf 
spark.kerberos.access.hadoopFileSystems=hdfs://nn-1,hdfs://nn-2,hdfs://nn-3 \
      ...
    ```
    
    During the submitting phase, Spark is going to call 
`HadoopFSDelegationTokenProvider` to fetch tokens from all declared NameNodes 
one by one, in **indeterminate** order 
(`HadoopFSDelegationTokenProvider.hadoopFSsToAccess` process and return a 
`Set[FileSystem]`), so the order may not respect the user declared order in 
`spark.kerberos.access.hadoopFileSystems`.
    
    If the order is [`nn-1`, `nn-2`, `nn-3`], then we are going to request a 
token from `nn-1` successfully, but fail for `nn-2` with the below error, the 
left `nn-3` is going to be skipped. But such failure WON'T block the whole 
submitting process, the Spark app is going to be submitted with only `nn-1` 
token.
    ```
    2024-01-03 12:41:36 [WARN] [main] 
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider#94 - Failed to 
get token from service hadoopfs
    org.apache.hadoop.ipc.RemoteException: <Some Error Message>
      at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507) 
~[hadoop-common-2.9.2.2.jar:?]
      ...
      at 
org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2604)
 ~[hadoop-hdfs-client-2.9.2.2.jar:?]
      at 
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.$anonfun$fetchDelegationTokens$1(HadoopFSDelegationTokenProvider.scala:122)
 ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
      at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:335) 
~[scala-library-2.12.15.jar:?]
      at 
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111) 
~[scala-library-2.12.15.jar:?]
      at 
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111) 
~[scala-library-2.12.15.jar:?]
      at 
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.fetchDelegationTokens(HadoopFSDelegationTokenProvider.scala:115)
 ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
      ...
      at 
org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.scala:146)
 ~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
      at 
org.apache.spark.deploy.yarn.Client.setupSecurityToken(Client.scala:352) 
~[spark-yarn_2.12-3.3.1.27.jar:3.3.1.27]
      at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:1140)
 ~[spark-yarn_2.12-3.3.1.27.jar:3.3.1.27]
      ...
      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
    ```
    when the Spark app access `nn-2` and `nn-3`, it will fail with 
`o.a.h.security.AccessControlException: Client cannot authenticate via:[TOKEN, 
KERBEROS]`
    
    Things become worse if the FS order is [`nn-3`, `nn-2`, `nn-1`], the Spark 
app will be submitted to YARN with only `nn-3` token, it even has no chance to 
allow NodeManager to upload aggregated logs after the application exit because 
it requires the app to provide a token to access `nn-1`.
    
    the log from NodeManager
    ```
    2024-01-03 08:08:14,028 [3173570620] - WARN [NM ContainerManager 
dispatcher:Client$Connection1$772] - Exception encountered while connecting to 
the server
    Caused by: org.apache.hadoop.security.AccessControlException: Client cannot 
authenticate via:[TOKEN, KERBEROS]
        at 
org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:179)
        at 
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:392)
        ...
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1768)
        ...
        at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.createAppDir(LogAggregationService.java:404)
        at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initAppAggregator(LogAggregationService.java:273)
        ...
    ```
    
    Without the logs, we even don't know what happened.
    
    Eventually, due to the **indeterminate** order of NameNodes to request 
tokens, such a Job sometimes submitted successfully and sometimes failed 
without logs.
    
    <img width="1903" alt="image" 
src="https://github.com/apache/spark/assets/26535726/7ca5c871-6399-4eae-b689-d6d741c1c373";>
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, when the user configures `spark.kerberos.access.hadoopFileSystems` to 
access multiple Kerberized HDFS, and one or more NameNodes have issues, tokens 
are always fetched from the rest health NameNodes after this patch.
    
    ### How was this patch tested?
    
    Tested in internal Kerberized cluster.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45030 from pan3793/SPARK-46977.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/deploy/security/HadoopFSDelegationTokenProvider.scala      | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 9242fe82d249..8eb45238b477 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.security.HadoopDelegationTokenProvider
+import org.apache.spark.util.Utils
 
 private[deploy] class HadoopFSDelegationTokenProvider
     extends HadoopDelegationTokenProvider with Logging {
@@ -116,10 +117,10 @@ private[deploy] class HadoopFSDelegationTokenProvider
       if (fsToExclude.contains(fs.getUri.getHost)) {
         // YARN RM skips renewing token with empty renewer
         logInfo(s"getting token for: $fs with empty renewer to skip renewal")
-        fs.addDelegationTokens("", creds)
+        Utils.tryLogNonFatalError { fs.addDelegationTokens("", creds) }
       } else {
         logInfo(s"getting token for: $fs with renewer $renewer")
-        fs.addDelegationTokens(renewer, creds)
+        Utils.tryLogNonFatalError { fs.addDelegationTokens(renewer, creds) }
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to