Repository: spark
Updated Branches:
  refs/heads/master 2cd1bfa4f -> 9fcf1c51d


[SPARK-17623][CORE] Clarify type of TaskEndReason with a failed task.

## What changes were proposed in this pull request?

In TaskResultGetter, enqueueFailedTask currently deserializes the result
as a TaskEndReason. But the type is actually more specific, its a
TaskFailedReason. This just leads to more blind casting later on – it
would be more clear if the msg was cast to the right type immediately,
so method parameter types could be tightened.

## How was this patch tested?

Existing unit tests via jenkins.  Note that the code was already performing a 
blind-cast to a TaskFailedReason before in any case, just in a different spot, 
so there shouldn't be any behavior change.

Author: Imran Rashid <iras...@cloudera.com>

Closes #15181 from squito/SPARK-17623.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fcf1c51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fcf1c51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fcf1c51

Branch: refs/heads/master
Commit: 9fcf1c51d518847eda7f5ea71337cfa7def3c45c
Parents: 2cd1bfa
Author: Imran Rashid <iras...@cloudera.com>
Authored: Wed Sep 21 17:49:36 2016 -0400
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Wed Sep 21 17:49:36 2016 -0400

----------------------------------------------------------------------
 .../apache/spark/executor/CommitDeniedException.scala   |  4 ++--
 .../main/scala/org/apache/spark/executor/Executor.scala |  4 ++--
 .../org/apache/spark/scheduler/TaskResultGetter.scala   |  4 ++--
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala  |  2 +-
 .../org/apache/spark/scheduler/TaskSetManager.scala     | 12 +++---------
 .../org/apache/spark/shuffle/FetchFailedException.scala |  4 ++--
 .../scala/org/apache/spark/util/JsonProtocolSuite.scala |  2 +-
 7 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala 
b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index 7d84889..326e042 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import org.apache.spark.{TaskCommitDenied, TaskEndReason}
+import org.apache.spark.{TaskCommitDenied, TaskFailedReason}
 
 /**
  * Exception thrown when a task attempts to commit output to HDFS but is 
denied by the driver.
@@ -29,5 +29,5 @@ private[spark] class CommitDeniedException(
     attemptNumber: Int)
   extends Exception(msg) {
 
-  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, 
attemptNumber)
+  def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, 
attemptNumber)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fbf2b86..668ec41 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -355,7 +355,7 @@ private[spark] class Executor(
 
       } catch {
         case ffe: FetchFailedException =>
-          val reason = ffe.toTaskEndReason
+          val reason = ffe.toTaskFailedReason
           setTaskFinishedAndClearInterruptStatus()
           execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 
@@ -370,7 +370,7 @@ private[spark] class Executor(
           execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled))
 
         case CausedBy(cDE: CommitDeniedException) =>
-          val reason = cDE.toTaskEndReason
+          val reason = cDE.toTaskFailedReason
           setTaskFinishedAndClearInterruptStatus()
           execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 685ef55..1c3fcbd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -118,14 +118,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
 
   def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: 
TaskState,
     serializedData: ByteBuffer) {
-    var reason : TaskEndReason = UnknownReason
+    var reason : TaskFailedReason = UnknownReason
     try {
       getTaskResultExecutor.execute(new Runnable {
         override def run(): Unit = Utils.logUncaughtExceptions {
           val loader = Utils.getContextOrSparkClassLoader
           try {
             if (serializedData != null && serializedData.limit() > 0) {
-              reason = serializer.get().deserialize[TaskEndReason](
+              reason = serializer.get().deserialize[TaskFailedReason](
                 serializedData, loader)
             }
           } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ee5cbfe..52a7186 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -431,7 +431,7 @@ private[spark] class TaskSchedulerImpl(
       taskSetManager: TaskSetManager,
       tid: Long,
       taskState: TaskState,
-      reason: TaskEndReason): Unit = synchronized {
+      reason: TaskFailedReason): Unit = synchronized {
     taskSetManager.handleFailedTask(tid, taskState, reason)
     if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
       // Need to revive offers again now that the task set manager state has 
been updated to

http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2fef447..226bed2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -696,7 +696,7 @@ private[spark] class TaskSetManager(
    * Marks the task as failed, re-adds it to the list of pending tasks, and 
notifies the
    * DAG Scheduler.
    */
-  def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
+  def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) {
     val info = taskInfos(tid)
     if (info.failed || info.killed) {
       return
@@ -707,7 +707,7 @@ private[spark] class TaskSetManager(
     copiesRunning(index) -= 1
     var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
     val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID 
$tid, ${info.host}): " +
-      reason.asInstanceOf[TaskFailedReason].toErrorString
+      reason.toErrorString
     val failureException: Option[Throwable] = reason match {
       case fetchFailed: FetchFailed =>
         logWarning(failureReason)
@@ -765,10 +765,6 @@ private[spark] class TaskSetManager(
       case e: TaskFailedReason =>  // TaskResultLost, TaskKilled, and others
         logWarning(failureReason)
         None
-
-      case e: TaskEndReason =>
-        logError("Unknown TaskEndReason: " + e)
-        None
     }
     // always add to failed executors
     failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
@@ -784,9 +780,7 @@ private[spark] class TaskSetManager(
       addPendingTask(index)
     }
 
-    if (!isZombie && state != TaskState.KILLED
-        && reason.isInstanceOf[TaskFailedReason]
-        && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
+    if (!isZombie && state != TaskState.KILLED && 
reason.countTowardsTaskFailures) {
       assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index b2d050b..498c12e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle
 
-import org.apache.spark.{FetchFailed, TaskEndReason}
+import org.apache.spark.{FetchFailed, TaskFailedReason}
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
@@ -45,7 +45,7 @@ private[spark] class FetchFailedException(
     this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
   }
 
-  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, 
mapId, reduceId,
+  def toTaskFailedReason: TaskFailedReason = FetchFailed(bmAddress, shuffleId, 
mapId, reduceId,
     Utils.exceptionString(this))
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fcf1c51/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c89be22..00314ab 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -146,7 +146,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 
15), 17, 18, 19,
       "Some exception")
     val fetchMetadataFailed = new MetadataFetchFailedException(17,
-      19, "metadata Fetch failed exception").toTaskEndReason
+      19, "metadata Fetch failed exception").toTaskFailedReason
     val exceptionFailure = new ExceptionFailure(exception, 
Seq.empty[AccumulableInfo])
     testTaskEndReason(Success)
     testTaskEndReason(Resubmitted)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to