Repository: spark
Updated Branches:
  refs/heads/master f54ff19b1 -> b2e4b314d


[SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why executors 
were killed.

Author: Mark Grover <grover.markgro...@gmail.com>

Closes #8093 from markgrover/nm2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e4b314
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e4b314
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e4b314

Branch: refs/heads/master
Commit: b2e4b314d989de8cad012bbddba703b31d8378a4
Parents: f54ff19
Author: Mark Grover <grover.markgro...@gmail.com>
Authored: Tue Nov 3 08:51:40 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Nov 3 08:51:40 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/TaskEndReason.scala |  8 ++++++--
 .../main/scala/org/apache/spark/rpc/RpcEndpointRef.scala |  4 ++--
 .../org/apache/spark/scheduler/TaskSetManager.scala      |  4 ++--
 .../cluster/CoarseGrainedSchedulerBackend.scala          |  5 +++--
 .../spark/scheduler/cluster/YarnSchedulerBackend.scala   |  1 +
 .../main/scala/org/apache/spark/util/JsonProtocol.scala  | 11 ++++++++---
 .../apache/spark/ui/jobs/JobProgressListenerSuite.scala  |  2 +-
 .../scala/org/apache/spark/util/JsonProtocolSuite.scala  | 11 ++++++-----
 8 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 18278b2..13241b7 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -223,8 +223,10 @@ case class TaskCommitDenied(
  * the task crashed the JVM.
  */
 @DeveloperApi
-case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
-  extends TaskFailedReason {
+case class ExecutorLostFailure(
+    execId: String,
+    exitCausedByApp: Boolean = true,
+    reason: Option[String]) extends TaskFailedReason {
   override def toErrorString: String = {
     val exitBehavior = if (exitCausedByApp) {
       "caused by one of the running tasks"
@@ -232,6 +234,8 @@ case class ExecutorLostFailure(execId: String, 
exitCausedByApp: Boolean = true)
       "unrelated to the running tasks"
     }
     s"ExecutorLostFailure (executor ${execId} exited due to an issue 
${exitBehavior})"
+    s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" +
+      reason.map { r => s" Reason: $r" }.getOrElse("")
   }
 
   override def countTowardsTaskFailures: Boolean = exitCausedByApp

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index f25710b..623da3e 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -67,7 +67,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
    * The default `timeout` will be used in every trial of calling 
`sendWithReply`. Because this
    * method retries, the message handling in the receiver side should be 
idempotent.
    *
-   * Note: this is a blocking action which may cost a lot of time,  so don't 
call it in an message
+   * Note: this is a blocking action which may cost a lot of time,  so don't 
call it in a message
    * loop of [[RpcEndpoint]].
    *
    * @param message the message to send
@@ -82,7 +82,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
    * retries. `timeout` will be used in every trial of calling 
`sendWithReply`. Because this method
    * retries, the message handling in the receiver side should be idempotent.
    *
-   * Note: this is a blocking action which may cost a lot of time, so don't 
call it in an message
+   * Note: this is a blocking action which may cost a lot of time, so don't 
call it in a message
    * loop of [[RpcEndpoint]].
    *
    * @param message the message to send

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 9b3fad9..114468c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -802,8 +802,8 @@ private[spark] class TaskSetManager(
         case exited: ExecutorExited => exited.exitCausedByApp
         case _ => true
       }
-      handleFailedTask(
-        tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, 
exitCausedByApp))
+      handleFailedTask(tid, TaskState.FAILED, 
ExecutorLostFailure(info.executorId, exitCausedByApp,
+        Some(reason.toString)))
     }
     // recalculate valid locality levels and waits when executor is lost
     recomputeLocality()

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 439a119..ebce502 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -125,7 +125,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             // Ignoring the task kill since the executor is not registered.
             logWarning(s"Attempted to kill task $taskId for unknown executor 
$executorId.")
         }
-
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
@@ -195,7 +194,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
       addressToExecutorId
         .get(remoteAddress)
-        .foreach(removeExecutor(_, SlaveLost("remote Rpc client 
disassociated")))
+        .foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. 
Likely due to " +
+          "containers exceeding thresholds, or network issues. Check driver 
logs for WARN " +
+          "messages.")))
     }
 
     // Make fake resource offers on just one executor

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index cb24072..d75d6f6 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -175,6 +175,7 @@ private[spark] abstract class YarnSchedulerBackend(
         addWebUIFilter(filterName, filterParams, proxyBase)
 
       case RemoveExecutor(executorId, reason) =>
+        logWarning(reason.toString)
         removeExecutor(executorId, reason)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ad6615c..ee2eb58 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -367,9 +367,10 @@ private[spark] object JsonProtocol {
         ("Job ID" -> taskCommitDenied.jobID) ~
         ("Partition ID" -> taskCommitDenied.partitionID) ~
         ("Attempt Number" -> taskCommitDenied.attemptNumber)
-      case ExecutorLostFailure(executorId, exitCausedByApp) =>
+      case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
         ("Executor ID" -> executorId) ~
-        ("Exit Caused By App" -> exitCausedByApp)
+        ("Exit Caused By App" -> exitCausedByApp) ~
+        ("Loss Reason" -> reason.map(_.toString))
       case _ => Utils.emptyJson
     }
     ("Reason" -> reason) ~ json
@@ -812,7 +813,11 @@ private[spark] object JsonProtocol {
       case `executorLostFailure` =>
         val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By 
App").map(_.extract[Boolean])
         val executorId = Utils.jsonOption(json \ "Executor 
ID").map(_.extract[String])
-        ExecutorLostFailure(executorId.getOrElse("Unknown"), 
exitCausedByApp.getOrElse(true))
+        val reason = Utils.jsonOption(json \ "Loss 
Reason").map(_.extract[String])
+        ExecutorLostFailure(
+          executorId.getOrElse("Unknown"),
+          exitCausedByApp.getOrElse(true),
+          reason)
       case `unknownReason` => UnknownReason
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index b140387..e02f5a1 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -243,7 +243,7 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
       ExceptionFailure("Exception", "description", null, null, None, None),
       TaskResultLost,
       TaskKilled,
-      ExecutorLostFailure("0"),
+      ExecutorLostFailure("0", true, Some("Induced failure")),
       UnknownReason)
     var failCount = 0
     for (reason <- taskFailedReasons) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 86137f2..953456c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -152,7 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testTaskEndReason(TaskResultLost)
     testTaskEndReason(TaskKilled)
     testTaskEndReason(TaskCommitDenied(2, 3, 4))
-    testTaskEndReason(ExecutorLostFailure("100", true))
+    testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced 
failure")))
     testTaskEndReason(UnknownReason)
 
     // BlockId
@@ -296,10 +296,10 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("ExecutorLostFailure backward compatibility") {
     // ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" 
property.
-    val executorLostFailure = ExecutorLostFailure("100", true)
+    val executorLostFailure = ExecutorLostFailure("100", true, Some("Induced 
failure"))
     val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
       .removeField({ _._1 == "Executor ID" })
-    val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
+    val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true, 
Some("Induced failure"))
     assert(expectedExecutorLostFailure === 
JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
@@ -603,10 +603,11 @@ class JsonProtocolSuite extends SparkFunSuite {
         assert(jobId1 === jobId2)
         assert(partitionId1 === partitionId2)
         assert(attemptNumber1 === attemptNumber2)
-      case (ExecutorLostFailure(execId1, exit1CausedByApp),
-          ExecutorLostFailure(execId2, exit2CausedByApp)) =>
+      case (ExecutorLostFailure(execId1, exit1CausedByApp, reason1),
+          ExecutorLostFailure(execId2, exit2CausedByApp, reason2)) =>
         assert(execId1 === execId2)
         assert(exit1CausedByApp === exit2CausedByApp)
+        assert(reason1 === reason2)
       case (UnknownReason, UnknownReason) =>
       case _ => fail("Task end reasons don't match in types!")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to