Repository: spark
Updated Branches:
  refs/heads/branch-1.4 bcb2c5d16 -> a17a0ee77


[SPARK-7503] [YARN] Resources in .sparkStaging directory can't be cleaned up on 
error

When we run applications on YARN with cluster mode, uploaded resources on 
.sparkStaging directory can't be cleaned up in case of failure of uploading 
local resources.

You can see this issue by running following command.
```
bin/spark-submit --master yarn --deploy-mode cluster --class <someClassName> 
<non-existing-jar>
```

Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>

Closes #6026 from sarutak/delete-uploaded-resources-on-error and squashes the 
following commits:

caef9f4 [Kousuke Saruta] Fixed style
882f921 [Kousuke Saruta] Wrapped Client#submitApplication with try/catch blocks 
in order to delete resources on error
1786ca4 [Kousuke Saruta] Merge branch 'master' of 
https://github.com/apache/spark into delete-uploaded-resources-on-error
f61071b [Kousuke Saruta] Fixed cleanup problem

(cherry picked from commit c64ff8036cc6bc7c87743f4c751d7fe91c2e366a)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a17a0ee7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a17a0ee7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a17a0ee7

Branch: refs/heads/branch-1.4
Commit: a17a0ee776921e53b4477f15d8c5101d02afc9d1
Parents: bcb2c5d
Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>
Authored: Fri May 15 11:37:34 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri May 15 11:37:50 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 72 +++++++++++++-------
 1 file changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a17a0ee7/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d21a739..7e023f2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, 
IOException}
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
 import java.security.PrivilegedExceptionAction
@@ -91,30 +91,52 @@ private[spark] class Client(
    * available in the alpha API.
    */
   def submitApplication(): ApplicationId = {
-    // Setup the credentials before doing anything else, so we have don't have 
issues at any point.
-    setupCredentials()
-    yarnClient.init(yarnConf)
-    yarnClient.start()
-
-    logInfo("Requesting a new application from cluster with %d NodeManagers"
-      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
-
-    // Get a new application from our RM
-    val newApp = yarnClient.createApplication()
-    val newAppResponse = newApp.getNewApplicationResponse()
-    val appId = newAppResponse.getApplicationId()
-
-    // Verify whether the cluster has enough resources for our AM
-    verifyClusterResources(newAppResponse)
-
-    // Set up the appropriate contexts to launch our AM
-    val containerContext = createContainerLaunchContext(newAppResponse)
-    val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
-
-    // Finally, submit and monitor the application
-    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
-    yarnClient.submitApplication(appContext)
-    appId
+    var appId: ApplicationId = null
+    try {
+      // Setup the credentials before doing anything else,
+      // so we have don't have issues at any point.
+      setupCredentials()
+      yarnClient.init(yarnConf)
+      yarnClient.start()
+
+      logInfo("Requesting a new application from cluster with %d NodeManagers"
+        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
+
+      // Get a new application from our RM
+      val newApp = yarnClient.createApplication()
+      val newAppResponse = newApp.getNewApplicationResponse()
+      appId = newAppResponse.getApplicationId()
+
+      // Verify whether the cluster has enough resources for our AM
+      verifyClusterResources(newAppResponse)
+
+      // Set up the appropriate contexts to launch our AM
+      val containerContext = createContainerLaunchContext(newAppResponse)
+      val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
+
+      // Finally, submit and monitor the application
+      logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+      yarnClient.submitApplication(appContext)
+      appId
+    } catch {
+      case e: Throwable =>
+        if (appId != null) {
+          val appStagingDir = getAppStagingDir(appId)
+          try {
+            val preserveFiles = 
sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
+            val stagingDirPath = new Path(appStagingDir)
+            val fs = FileSystem.get(hadoopConf)
+            if (!preserveFiles && fs.exists(stagingDirPath)) {
+              logInfo("Deleting staging directory " + stagingDirPath)
+              fs.delete(stagingDirPath, true)
+            }
+          } catch {
+            case ioe: IOException =>
+              logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
+          }
+        }
+        throw e
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to