Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18248626
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
---
    @@ -71,80 +74,108 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict 
with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't 
conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend 
can pick it up.
    -      System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that 
YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best 
case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a 
best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird 
failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's 
the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || 
isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > 
FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, 
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > 
FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, 
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into 
the
    -    // Hadoop UGI. This has to happen before the startUserClass which does 
a
    -    // doAs in order for the credentials to be passed on to the executor 
containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into 
the
    +      // Hadoop UGI. This has to happen before the startUserClass which 
does a
    +      // doAs in order for the credentials to be passed on to the executor 
containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
    --- End diff --
    
    I see, though this kind of assumes that there's not much code to execute 
after this catch, which  happens to be true in our case but not always. I think 
it makes sense to just catch `Exception`s here and let `Throwable`s surface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to