This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d62c18b7497 [SPARK-41313][SPARK-3900][SPARK-21138] Combine fixes for 
and
d62c18b7497 is described below

commit d62c18b7497997188ec587e1eb62e75c979c1c93
Author: Xing Lin <xing...@linkedin.com>
AuthorDate: Sun Dec 4 08:24:37 2022 -0600

    [SPARK-41313][SPARK-3900][SPARK-21138] Combine fixes for and
    
    ### What changes were proposed in this pull request?
    
    spark-3900 fixed the illegalStateException in cleanupStagingDir in 
ApplicationMaster's shutdownhook. However, spark-21138 accidentally 
reverted/undid that change when fixing the "Wrong FS" bug. Now, we are seeing 
spark-3900 reported by our users at Linkedin. We need to bring back the fix for 
spark-3900.
    
    The illegalStateException when creating a new filesystem object is due to 
the limitation in hadoop that we can not register a shutdownhook during 
shutdown. So, when a spark job fails during pre-launch, as part of shutdown, 
cleanupStagingDir would be called. Then, if we attempt to create a new 
filesystem object for the first time, hadoop would try to register a hook to 
shutdown KeyProviderCache when creating a ClientContext for DFSClient. As a 
result, we hit the illegalStateException.  [...]
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Closes #38832 from xinglin/SPARK-41313.
    
    Authored-by: Xing Lin <xing...@linkedin.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala    | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a7676fe24f6..69dd72720a5 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -240,6 +240,9 @@ private[spark] class ApplicationMaster(
 
       logInfo("ApplicationAttemptId: " + appAttemptId)
 
+      // During shutdown, we may not be able to create an FileSystem object. 
So, pre-create here.
+      val stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+      val stagingDirFs = stagingDirPath.getFileSystem(yarnConf)
       // This shutdown hook should run *after* the SparkContext is shut down.
       val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
       ShutdownHookManager.addShutdownHook(priority) { () =>
@@ -261,14 +264,14 @@ private[spark] class ApplicationMaster(
           if (!unregistered) {
             // we only want to unregister if we don't want the RM to retry
             if (isLastAttempt) {
-              cleanupStagingDir(new 
Path(System.getenv("SPARK_YARN_STAGING_DIR")))
+              cleanupStagingDir(stagingDirFs, stagingDirPath)
               unregister(finalStatus, finalMsg)
             } else if (finalStatus == FinalApplicationStatus.SUCCEEDED) {
               // When it's not the last attempt, if unregister failed caused 
by timeout exception,
               // YARN will rerun the application, AM should not clean staging 
dir before unregister
               // success.
               unregister(finalStatus, finalMsg)
-              cleanupStagingDir(new 
Path(System.getenv("SPARK_YARN_STAGING_DIR")))
+              cleanupStagingDir(stagingDirFs, stagingDirPath)
             }
           }
         } catch {
@@ -686,11 +689,15 @@ private[spark] class ApplicationMaster(
    * Clean up the staging directory.
    */
   private def cleanupStagingDir(stagingDirPath: Path): Unit = {
+    val stagingDirFs = stagingDirPath.getFileSystem(yarnConf)
+    cleanupStagingDir(stagingDirFs, stagingDirPath)
+  }
+
+  private def cleanupStagingDir(fs: FileSystem, stagingDirPath: Path): Unit = {
     try {
       val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       if (!preserveFiles) {
         logInfo("Deleting staging directory " + stagingDirPath)
-        val fs = stagingDirPath.getFileSystem(yarnConf)
         fs.delete(stagingDirPath, true)
       }
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to