Repository: spark
Updated Branches:
  refs/heads/master 2ad276954 -> b47927814


[SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before 
self-kill

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17451

`CoarseGrainedExecutorBackend` in some failure cases exits the JVM. While this 
does not have any issue, from the driver UI there is no specific reason 
captured for this. In this PR, I am adding functionality to `exitExecutor` to 
notify driver that the executor is exiting.

## How was this patch tested?

Ran the change over a test env and took down shuffle service before the 
executor could register to it. In the driver logs, where the job failure reason 
is mentioned (ie. `Job aborted due to stage ...` it gives the correct reason:

Before:
`ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running 
tasks) Reason: Remote RPC client disassociated. Likely due to containers 
exceeding thresholds, or network issues. Check driver logs for WARN messages.`

After:
`ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running 
tasks) Reason: Unable to create executor due to 
java.util.concurrent.TimeoutException: Timeout waiting for task.`

Author: Tejas Patil <tej...@fb.com>

Closes #15013 from tejasapatil/SPARK-17451_inform_driver.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4792781
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4792781
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4792781

Branch: refs/heads/master
Commit: b479278142728eb003b9ee466fab0e8d6ec4b13d
Parents: 2ad2769
Author: Tejas Patil <tej...@fb.com>
Authored: Thu Sep 15 10:23:41 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Sep 15 10:23:41 2016 -0700

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala | 26 +++++++++++++++-----
 .../org/apache/spark/storage/BlockManager.scala |  3 +++
 2 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4792781/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 391b97d..7eec4ae 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       case Success(msg) =>
         // Always receive `true`. Just ignore it
       case Failure(e) =>
-        exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
+        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, 
notifyDriver = false)
     }(ThreadUtils.sameThread)
   }
 
@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     if (stopping.get()) {
       logInfo(s"Driver from $remoteAddress disconnected during shutdown")
     } else if (driver.exists(_.address == remoteAddress)) {
-      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
+      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", 
null,
+        notifyDriver = false)
     } else {
       logWarning(s"An unknown ($remoteAddress) driver disconnected.")
     }
@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
    * executor exits differently. For e.g. when an executor goes down,
    * back-end may not want to take the parent process down.
    */
-  protected def exitExecutor(code: Int, reason: String, throwable: Throwable = 
null) = {
+  protected def exitExecutor(code: Int,
+                             reason: String,
+                             throwable: Throwable = null,
+                             notifyDriver: Boolean = true) = {
+    val message = "Executor self-exiting due to : " + reason
     if (throwable != null) {
-      logError(reason, throwable)
+      logError(message, throwable)
     } else {
-      logError(reason)
+      logError(message)
     }
+
+    if (notifyDriver && driver.nonEmpty) {
+      driver.get.ask[Boolean](
+        RemoveExecutor(executorId, new ExecutorLossReason(reason))
+      ).onFailure { case e =>
+        logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
+      }(ThreadUtils.sameThread)
+    }
+
     System.exit(code)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b4792781/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a724fdf..c172ac2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -199,6 +199,9 @@ private[spark] class BlockManager(
           logError(s"Failed to connect to external shuffle server, will retry 
${MAX_ATTEMPTS - i}"
             + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
           Thread.sleep(SLEEP_TIME_SECS * 1000)
+        case NonFatal(e) =>
+          throw new SparkException("Unable to register with external shuffle 
server due to : " +
+            e.getMessage, e)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to