Repository: spark Updated Branches: refs/heads/master f54ff19b1 -> b2e4b314d
[SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why executors were killed. Author: Mark Grover <grover.markgro...@gmail.com> Closes #8093 from markgrover/nm2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e4b314 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e4b314 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e4b314 Branch: refs/heads/master Commit: b2e4b314d989de8cad012bbddba703b31d8378a4 Parents: f54ff19 Author: Mark Grover <grover.markgro...@gmail.com> Authored: Tue Nov 3 08:51:40 2015 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Nov 3 08:51:40 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 8 ++++++-- .../main/scala/org/apache/spark/rpc/RpcEndpointRef.scala | 4 ++-- .../org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- .../spark/scheduler/cluster/YarnSchedulerBackend.scala | 1 + .../main/scala/org/apache/spark/util/JsonProtocol.scala | 11 ++++++++--- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 11 ++++++----- 8 files changed, 29 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/TaskEndReason.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 18278b2..13241b7 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -223,8 +223,10 @@ case class TaskCommitDenied( * the task crashed the JVM. */ @DeveloperApi -case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true) - extends TaskFailedReason { +case class ExecutorLostFailure( + execId: String, + exitCausedByApp: Boolean = true, + reason: Option[String]) extends TaskFailedReason { override def toErrorString: String = { val exitBehavior = if (exitCausedByApp) { "caused by one of the running tasks" @@ -232,6 +234,8 @@ case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true) "unrelated to the running tasks" } s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})" + s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" + + reason.map { r => s" Reason: $r" }.getOrElse("") } override def countTowardsTaskFailures: Boolean = exitCausedByApp http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index f25710b..623da3e 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -67,7 +67,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this * method retries, the message handling in the receiver side should be idempotent. * - * Note: this is a blocking action which may cost a lot of time, so don't call it in an message + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send @@ -82,7 +82,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method * retries, the message handling in the receiver side should be idempotent. * - * Note: this is a blocking action which may cost a lot of time, so don't call it in an message + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9b3fad9..114468c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -802,8 +802,8 @@ private[spark] class TaskSetManager( case exited: ExecutorExited => exited.exitCausedByApp case _ => true } - handleFailedTask( - tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp)) + handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp, + Some(reason.toString))) } // recalculate valid locality levels and waits when executor is lost recomputeLocality() http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 439a119..ebce502 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -125,7 +125,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } - } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -195,7 +194,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onDisconnected(remoteAddress: RpcAddress): Unit = { addressToExecutorId .get(remoteAddress) - .foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated"))) + .foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " + + "containers exceeding thresholds, or network issues. Check driver logs for WARN " + + "messages."))) } // Make fake resource offers on just one executor http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index cb24072..d75d6f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -175,6 +175,7 @@ private[spark] abstract class YarnSchedulerBackend( addWebUIFilter(filterName, filterParams, proxyBase) case RemoveExecutor(executorId, reason) => + logWarning(reason.toString) removeExecutor(executorId, reason) } http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ad6615c..ee2eb58 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -367,9 +367,10 @@ private[spark] object JsonProtocol { ("Job ID" -> taskCommitDenied.jobID) ~ ("Partition ID" -> taskCommitDenied.partitionID) ~ ("Attempt Number" -> taskCommitDenied.attemptNumber) - case ExecutorLostFailure(executorId, exitCausedByApp) => + case ExecutorLostFailure(executorId, exitCausedByApp, reason) => ("Executor ID" -> executorId) ~ - ("Exit Caused By App" -> exitCausedByApp) + ("Exit Caused By App" -> exitCausedByApp) ~ + ("Loss Reason" -> reason.map(_.toString)) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -812,7 +813,11 @@ private[spark] object JsonProtocol { case `executorLostFailure` => val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean]) val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String]) - ExecutorLostFailure(executorId.getOrElse("Unknown"), exitCausedByApp.getOrElse(true)) + val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String]) + ExecutorLostFailure( + executorId.getOrElse("Unknown"), + exitCausedByApp.getOrElse(true), + reason) case `unknownReason` => UnknownReason } } http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index b140387..e02f5a1 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -243,7 +243,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with ExceptionFailure("Exception", "description", null, null, None, None), TaskResultLost, TaskKilled, - ExecutorLostFailure("0"), + ExecutorLostFailure("0", true, Some("Induced failure")), UnknownReason) var failCount = 0 for (reason <- taskFailedReasons) { http://git-wip-us.apache.org/repos/asf/spark/blob/b2e4b314/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 86137f2..953456c 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -152,7 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskEndReason(TaskResultLost) testTaskEndReason(TaskKilled) testTaskEndReason(TaskCommitDenied(2, 3, 4)) - testTaskEndReason(ExecutorLostFailure("100", true)) + testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced failure"))) testTaskEndReason(UnknownReason) // BlockId @@ -296,10 +296,10 @@ class JsonProtocolSuite extends SparkFunSuite { test("ExecutorLostFailure backward compatibility") { // ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property. - val executorLostFailure = ExecutorLostFailure("100", true) + val executorLostFailure = ExecutorLostFailure("100", true, Some("Induced failure")) val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure) .removeField({ _._1 == "Executor ID" }) - val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true) + val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true, Some("Induced failure")) assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent)) } @@ -603,10 +603,11 @@ class JsonProtocolSuite extends SparkFunSuite { assert(jobId1 === jobId2) assert(partitionId1 === partitionId2) assert(attemptNumber1 === attemptNumber2) - case (ExecutorLostFailure(execId1, exit1CausedByApp), - ExecutorLostFailure(execId2, exit2CausedByApp)) => + case (ExecutorLostFailure(execId1, exit1CausedByApp, reason1), + ExecutorLostFailure(execId2, exit2CausedByApp, reason2)) => assert(execId1 === execId2) assert(exit1CausedByApp === exit2CausedByApp) + assert(reason1 === reason2) case (UnknownReason, UnknownReason) => case _ => fail("Task end reasons don't match in types!") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org