Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ba8352c76 -> ff0a7f400


[SPARK-6286][minor] Handle missing Mesos case TASK_ERROR.

Author: Iulian Dragos <jagua...@gmail.com>

Closes #5000 from dragos/issue/task-error-case and squashes the following 
commits:

e063627 [Iulian Dragos] Handle TASK_ERROR in Mesos scheduler backends.
ac17cf0 [Iulian Dragos] Handle missing Mesos case TASK_ERROR.

(cherry picked from commit 9d112a958ee2facad179344dd367a6d1ccbc9614)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff0a7f40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff0a7f40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff0a7f40

Branch: refs/heads/branch-1.3
Commit: ff0a7f400a0d8d7ddba4a0a09ee7c0647a888e00
Parents: ba8352c
Author: Iulian Dragos <jagua...@gmail.com>
Authored: Wed Mar 18 09:15:33 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Mar 18 09:15:44 2015 -0400

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/TaskState.scala    |  1 +
 .../cluster/mesos/CoarseMesosSchedulerBackend.scala     | 12 ++----------
 .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 10 +---------
 3 files changed, 4 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff0a7f40/core/src/main/scala/org/apache/spark/TaskState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala 
b/core/src/main/scala/org/apache/spark/TaskState.scala
index 0bf1e4a..d85a6d6 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -46,5 +46,6 @@ private[spark] object TaskState extends Enumeration {
     case MesosTaskState.TASK_FAILED => FAILED
     case MesosTaskState.TASK_KILLED => KILLED
     case MesosTaskState.TASK_LOST => LOST
+    case MesosTaskState.TASK_ERROR => LOST
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff0a7f40/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 90dfe14..fc92b9c 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => 
MesosTaskState, _}
 
-import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, 
TaskState}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.{Utils, AkkaUtils}
@@ -262,20 +262,12 @@ private[spark] class CoarseMesosSchedulerBackend(
       .build()
   }
 
-  /** Check whether a Mesos task state represents a finished task */
-  private def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue.toInt
     val state = status.getState
     logInfo("Mesos task " + taskId + " is now " + state)
     synchronized {
-      if (isFinished(state)) {
+      if (TaskState.isFinished(TaskState.fromMesos(state))) {
         val slaveId = taskIdToSlaveId(taskId)
         slaveIdsWithExecutors -= slaveId
         taskIdToSlaveId -= taskId

http://git-wip-us.apache.org/repos/asf/spark/blob/ff0a7f40/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index cfb6592..df8f430 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -313,14 +313,6 @@ private[spark] class MesosSchedulerBackend(
       .build()
   }
 
-  /** Check whether a Mesos task state represents a finished task */
-  def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     inClassLoader() {
       val tid = status.getTaskId.getValue.toLong
@@ -330,7 +322,7 @@ private[spark] class MesosSchedulerBackend(
           // We lost the executor on this slave, so remember that it's gone
           removeExecutor(taskIdToSlaveId(tid), "Lost executor")
         }
-        if (isFinished(status.getState)) {
+        if (TaskState.isFinished(state)) {
           taskIdToSlaveId.remove(tid)
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to