Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5e6fdc659 -> 0579f28df


[SPARK-8625] [CORE] Propagate user exceptions in tasks back to driver

This allows clients to retrieve the original exception from the
cause field of the SparkException that is thrown by the driver.
If the original exception is not in fact Serializable then it will
not be returned, but the message and stacktrace will be. (All Java
Throwables implement the Serializable interface, but this is no
guarantee that a particular implementation can actually be
serialized.)

Author: Tom White <t...@cloudera.com>

Closes #7014 from tomwhite/propagate-user-exceptions.

(cherry picked from commit 2e680668f7b6fc158aa068aedd19c1878ecf759e)
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0579f28d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0579f28d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0579f28d

Branch: refs/heads/branch-1.5
Commit: 0579f28df246e9b7cb88bb848c7fb4e8607b8049
Parents: 5e6fdc6
Author: Tom White <t...@cloudera.com>
Authored: Wed Aug 12 10:06:27 2015 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed Aug 12 10:08:19 2015 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala  | 44 ++++++++++++-
 .../org/apache/spark/executor/Executor.scala    | 14 ++++-
 .../apache/spark/scheduler/DAGScheduler.scala   | 44 ++++++++-----
 .../spark/scheduler/DAGSchedulerEvent.scala     |  3 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 12 ++--
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 .../spark/ExecutorAllocationManagerSuite.scala  |  2 +-
 .../scala/org/apache/spark/FailureSuite.scala   | 66 +++++++++++++++++++-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  5 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  3 +-
 12 files changed, 165 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 48fd3e7..934d00d 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
@@ -90,6 +92,10 @@ case class FetchFailed(
  *
  * `fullStackTrace` is a better representation of the stack trace because it 
contains the whole
  * stack trace including the exception and its causes
+ *
+ * `exception` is the actual exception that caused the task to fail. It may be 
`None` in
+ * the case that the exception is not in fact serializable. If a task fails 
more than
+ * once (due to retries), `exception` is that one that caused the last failure.
  */
 @DeveloperApi
 case class ExceptionFailure(
@@ -97,11 +103,26 @@ case class ExceptionFailure(
     description: String,
     stackTrace: Array[StackTraceElement],
     fullStackTrace: String,
-    metrics: Option[TaskMetrics])
+    metrics: Option[TaskMetrics],
+    private val exceptionWrapper: Option[ThrowableSerializationWrapper])
   extends TaskFailedReason {
 
+  /**
+   * `preserveCause` is used to keep the exception itself so it is available 
to the
+   * driver. This may be set to `false` in the event that the exception is not 
in fact
+   * serializable.
+   */
+  private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], 
preserveCause: Boolean) {
+    this(e.getClass.getName, e.getMessage, e.getStackTrace, 
Utils.exceptionString(e), metrics,
+      if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None)
+  }
+
   private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
-    this(e.getClass.getName, e.getMessage, e.getStackTrace, 
Utils.exceptionString(e), metrics)
+    this(e, metrics, preserveCause = true)
+  }
+
+  def exception: Option[Throwable] = exceptionWrapper.flatMap {
+    (w: ThrowableSerializationWrapper) => Option(w.exception)
   }
 
   override def toErrorString: String =
@@ -128,6 +149,25 @@ case class ExceptionFailure(
 }
 
 /**
+ * A class for recovering from exceptions when deserializing a Throwable that 
was
+ * thrown in user task code. If the Throwable cannot be deserialized it will 
be null,
+ * but the stacktrace and message will be preserved correctly in 
SparkException.
+ */
+private[spark] class ThrowableSerializationWrapper(var exception: Throwable) 
extends
+    Serializable with Logging {
+  private def writeObject(out: ObjectOutputStream): Unit = {
+    out.writeObject(exception)
+  }
+  private def readObject(in: ObjectInputStream): Unit = {
+    try {
+      exception = in.readObject().asInstanceOf[Throwable]
+    } catch {
+      case e : Exception => log.warn("Task exception could not be 
deserialized", e)
+    }
+  }
+}
+
+/**
  * :: DeveloperApi ::
  * The task finished successfully, but the result was lost from the executor's 
block manager before
  * it was fetched.

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5d78a9d..42a85e4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import java.io.File
+import java.io.{File, NotSerializableException}
 import java.lang.management.ManagementFactory
 import java.net.URL
 import java.nio.ByteBuffer
@@ -305,8 +305,16 @@ private[spark] class Executor(
               m
             }
           }
-          val taskEndReason = new ExceptionFailure(t, metrics)
-          execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(taskEndReason))
+          val serializedTaskEndReason = {
+            try {
+              ser.serialize(new ExceptionFailure(t, metrics))
+            } catch {
+              case _: NotSerializableException =>
+                // t is not serializable so just send the stacktrace
+                ser.serialize(new ExceptionFailure(t, metrics, false))
+            }
+          }
+          execBackend.statusUpdate(taskId, TaskState.FAILED, 
serializedTaskEndReason)
 
           // Don't forcibly exit unless the exception was inherently fatal, to 
avoid
           // stopping other tasks unnecessarily.

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index bb489c6..7ab5ccf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -200,8 +200,8 @@ class DAGScheduler(
 
   // Called by TaskScheduler to cancel an entire TaskSet due to either 
repeated failures or
   // cancellation of the job itself.
-  def taskSetFailed(taskSet: TaskSet, reason: String): Unit = {
-    eventProcessLoop.post(TaskSetFailed(taskSet, reason))
+  def taskSetFailed(taskSet: TaskSet, reason: String, exception: 
Option[Throwable]): Unit = {
+    eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
   }
 
   private[scheduler]
@@ -677,8 +677,11 @@ class DAGScheduler(
     submitWaitingStages()
   }
 
-  private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) 
{
-    stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) }
+  private[scheduler] def handleTaskSetFailed(
+      taskSet: TaskSet,
+      reason: String,
+      exception: Option[Throwable]): Unit = {
+    stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, 
exception) }
     submitWaitingStages()
   }
 
@@ -762,7 +765,7 @@ class DAGScheduler(
         }
       }
     } else {
-      abortStage(stage, "No active job for stage " + stage.id)
+      abortStage(stage, "No active job for stage " + stage.id, None)
     }
   }
 
@@ -816,7 +819,7 @@ class DAGScheduler(
       case NonFatal(e) =>
         stage.makeNewStageAttempt(partitionsToCompute.size)
         listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, 
properties))
-        abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}")
+        abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}", Some(e))
         runningStages -= stage
         return
     }
@@ -845,13 +848,13 @@ class DAGScheduler(
     } catch {
       // In the case of a failure during serialization, abort the stage.
       case e: NotSerializableException =>
-        abortStage(stage, "Task not serializable: " + e.toString)
+        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
         runningStages -= stage
 
         // Abort execution
         return
       case NonFatal(e) =>
-        abortStage(stage, s"Task serialization failed: 
$e\n${e.getStackTraceString}")
+        abortStage(stage, s"Task serialization failed: 
$e\n${e.getStackTraceString}", Some(e))
         runningStages -= stage
         return
     }
@@ -878,7 +881,7 @@ class DAGScheduler(
       }
     } catch {
       case NonFatal(e) =>
-        abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}")
+        abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}", Some(e))
         runningStages -= stage
         return
     }
@@ -1098,7 +1101,8 @@ class DAGScheduler(
           }
 
           if (disallowStageRetryForTest) {
-            abortStage(failedStage, "Fetch failure will not retry stage due to 
testing config")
+            abortStage(failedStage, "Fetch failure will not retry stage due to 
testing config",
+              None)
           } else if (failedStages.isEmpty) {
             // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
             // in that case the event will already have been scheduled.
@@ -1126,7 +1130,7 @@ class DAGScheduler(
       case commitDenied: TaskCommitDenied =>
         // Do nothing here, left up to the TaskScheduler to decide how to 
handle denied commits
 
-      case ExceptionFailure(className, description, stackTrace, 
fullStackTrace, metrics) =>
+      case exceptionFailure: ExceptionFailure =>
         // Do nothing here, left up to the TaskScheduler to decide how to 
handle user failures
 
       case TaskResultLost =>
@@ -1235,7 +1239,10 @@ class DAGScheduler(
    * Aborts all jobs depending on a particular Stage. This is called in 
response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this 
event from outside.
    */
-  private[scheduler] def abortStage(failedStage: Stage, reason: String) {
+  private[scheduler] def abortStage(
+      failedStage: Stage,
+      reason: String,
+      exception: Option[Throwable]): Unit = {
     if (!stageIdToStage.contains(failedStage.id)) {
       // Skip all the actions if the stage has been removed.
       return
@@ -1244,7 +1251,7 @@ class DAGScheduler(
       activeJobs.filter(job => stageDependsOn(job.finalStage, 
failedStage)).toSeq
     failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
     for (job <- dependentJobs) {
-      failJobAndIndependentStages(job, s"Job aborted due to stage failure: 
$reason")
+      failJobAndIndependentStages(job, s"Job aborted due to stage failure: 
$reason", exception)
     }
     if (dependentJobs.isEmpty) {
       logInfo("Ignoring failure of " + failedStage + " because all jobs 
depending on it are done")
@@ -1252,8 +1259,11 @@ class DAGScheduler(
   }
 
   /** Fails a job and all stages that are only used by that job, and cleans up 
relevant state. */
-  private def failJobAndIndependentStages(job: ActiveJob, failureReason: 
String) {
-    val error = new SparkException(failureReason)
+  private def failJobAndIndependentStages(
+      job: ActiveJob,
+      failureReason: String,
+      exception: Option[Throwable] = None): Unit = {
+    val error = new SparkException(failureReason, exception.getOrElse(null))
     var ableToCancelStages = true
 
     val shouldInterruptThread =
@@ -1462,8 +1472,8 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case completion @ CompletionEvent(task, reason, _, _, taskInfo, 
taskMetrics) =>
       dagScheduler.handleTaskCompletion(completion)
 
-    case TaskSetFailed(taskSet, reason) =>
-      dagScheduler.handleTaskSetFailed(taskSet, reason)
+    case TaskSetFailed(taskSet, reason, exception) =>
+      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
 
     case ResubmitFailedStages =>
       dagScheduler.resubmitFailedStages()

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index a213d41..f72a52e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,6 +73,7 @@ private[scheduler] case class ExecutorAdded(execId: String, 
host: String) extend
 private[scheduler] case class ExecutorLost(execId: String) extends 
DAGSchedulerEvent
 
 private[scheduler]
-case class TaskSetFailed(taskSet: TaskSet, reason: String) extends 
DAGSchedulerEvent
+case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: 
Option[Throwable])
+  extends DAGSchedulerEvent
 
 private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 82455b0..818b95d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -662,7 +662,7 @@ private[spark] class TaskSetManager(
 
     val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID 
$tid, ${info.host}): " +
       reason.asInstanceOf[TaskFailedReason].toErrorString
-    reason match {
+    val failureException: Option[Throwable] = reason match {
       case fetchFailed: FetchFailed =>
         logWarning(failureReason)
         if (!successful(index)) {
@@ -671,6 +671,7 @@ private[spark] class TaskSetManager(
         }
         // Not adding to failed executors for FetchFailed.
         isZombie = true
+        None
 
       case ef: ExceptionFailure =>
         taskMetrics = ef.metrics.orNull
@@ -706,12 +707,15 @@ private[spark] class TaskSetManager(
             s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on 
executor ${info.host}: " +
             s"${ef.className} (${ef.description}) [duplicate $dupCount]")
         }
+        ef.exception
 
       case e: TaskFailedReason =>  // TaskResultLost, TaskKilled, and others
         logWarning(failureReason)
+        None
 
       case e: TaskEndReason =>
         logError("Unknown TaskEndReason: " + e)
+        None
     }
     // always add to failed executors
     failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
@@ -728,16 +732,16 @@ private[spark] class TaskSetManager(
         logError("Task %d in stage %s failed %d times; aborting job".format(
           index, taskSet.id, maxTaskFailures))
         abort("Task %d in stage %s failed %d times, most recent failure: 
%s\nDriver stacktrace:"
-          .format(index, taskSet.id, maxTaskFailures, failureReason))
+          .format(index, taskSet.id, maxTaskFailures, failureReason), 
failureException)
         return
       }
     }
     maybeFinishTaskSet()
   }
 
-  def abort(message: String): Unit = sched.synchronized {
+  def abort(message: String, exception: Option[Throwable] = None): Unit = 
sched.synchronized {
     // TODO: Kill running tasks if we were not terminated due to a Mesos error
-    sched.dagScheduler.taskSetFailed(taskSet, message)
+    sched.dagScheduler.taskSetFailed(taskSet, message, exception)
     isZombie = true
     maybeFinishTaskSet()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c600319..cbc94fd 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -790,7 +790,7 @@ private[spark] object JsonProtocol {
         val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
           map(_.extract[String]).orNull
         val metrics = Utils.jsonOption(json \ 
"Metrics").map(taskMetricsFromJson)
-        ExceptionFailure(className, description, stackTrace, fullStackTrace, 
metrics)
+        ExceptionFailure(className, description, stackTrace, fullStackTrace, 
metrics, None)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled
       case `executorLostFailure` =>

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f374f97..116f027 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -800,7 +800,7 @@ class ExecutorAllocationManagerSuite
     assert(maxNumExecutorsNeeded(manager) === 1)
 
     // If the task is failed, we expect it to be resubmitted later.
-    val taskEndReason = ExceptionFailure(null, null, null, null, null)
+    val taskEndReason = ExceptionFailure(null, null, null, null, null, None)
     sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, 
taskInfo, null))
     assert(maxNumExecutorsNeeded(manager) === 1)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 69cb4b4..aa50a49 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import org.apache.spark.util.NonSerializable
 
-import java.io.NotSerializableException
+import java.io.{IOException, NotSerializableException, ObjectInputStream}
 
 // Common state shared by FailureSuite-launched tasks. We use a global object
 // for this because any local variables used in the task closures will 
rightfully
@@ -166,5 +166,69 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
     assert(thrownDueToMemoryLeak.getMessage.contains("memory leak"))
   }
 
+  // Run a 3-task map job in which task 1 always fails with a exception 
message that
+  // depends on the failure number, and check that we get the last failure.
+  test("last failure cause is sent back to driver") {
+    sc = new SparkContext("local[1,2]", "test")
+    val data = sc.makeRDD(1 to 3, 3).map { x =>
+      FailureSuiteState.synchronized {
+        FailureSuiteState.tasksRun += 1
+        if (x == 3) {
+          FailureSuiteState.tasksFailed += 1
+          throw new UserException("oops",
+            new IllegalArgumentException("failed=" + 
FailureSuiteState.tasksFailed))
+        }
+      }
+      x * x
+    }
+    val thrown = intercept[SparkException] {
+      data.collect()
+    }
+    FailureSuiteState.synchronized {
+      assert(FailureSuiteState.tasksRun === 4)
+    }
+    assert(thrown.getClass === classOf[SparkException])
+    assert(thrown.getCause.getClass === classOf[UserException])
+    assert(thrown.getCause.getMessage === "oops")
+    assert(thrown.getCause.getCause.getClass === 
classOf[IllegalArgumentException])
+    assert(thrown.getCause.getCause.getMessage === "failed=2")
+    FailureSuiteState.clear()
+  }
+
+  test("failure cause stacktrace is sent back to driver if exception is not 
serializable") {
+    sc = new SparkContext("local", "test")
+    val thrown = intercept[SparkException] {
+      sc.makeRDD(1 to 3).foreach { _ => throw new NonSerializableUserException 
}
+    }
+    assert(thrown.getClass === classOf[SparkException])
+    assert(thrown.getCause === null)
+    assert(thrown.getMessage.contains("NonSerializableUserException"))
+    FailureSuiteState.clear()
+  }
+
+  test("failure cause stacktrace is sent back to driver if exception is not 
deserializable") {
+    sc = new SparkContext("local", "test")
+    val thrown = intercept[SparkException] {
+      sc.makeRDD(1 to 3).foreach { _ => throw new 
NonDeserializableUserException }
+    }
+    assert(thrown.getClass === classOf[SparkException])
+    assert(thrown.getCause === null)
+    assert(thrown.getMessage.contains("NonDeserializableUserException"))
+    FailureSuiteState.clear()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
+
+class UserException(message: String, cause: Throwable)
+  extends RuntimeException(message, cause)
+
+class NonSerializableUserException extends RuntimeException {
+  val nonSerializableInstanceVariable = new NonSerializable
+}
+
+class NonDeserializableUserException extends RuntimeException {
+  private def readObject(in: ObjectInputStream): Unit = {
+    throw new IOException("Intentional exception during deserialization.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 86dff8f..b0ca49c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -242,7 +242,7 @@ class DAGSchedulerSuite
 
   /** Sends TaskSetFailed to the scheduler. */
   private def failed(taskSet: TaskSet, message: String) {
-    runEvent(TaskSetFailed(taskSet, message))
+    runEvent(TaskSetFailed(taskSet, message, None))
   }
 
   /** Sends JobCancelled to the DAG scheduler. */

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index f7cc4bb..edbdb48 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -48,7 +48,10 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: 
FakeTaskScheduler)
 
   override def executorLost(execId: String) {}
 
-  override def taskSetFailed(taskSet: TaskSet, reason: String) {
+  override def taskSetFailed(
+      taskSet: TaskSet,
+      reason: String,
+      exception: Option[Throwable]): Unit = {
     taskScheduler.taskSetsFailed += taskSet.id
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 56f7b9c..b140387 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -240,7 +240,7 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
     val taskFailedReasons = Seq(
       Resubmitted,
       new FetchFailed(null, 0, 0, 0, "ignored"),
-      ExceptionFailure("Exception", "description", null, null, None),
+      ExceptionFailure("Exception", "description", null, null, None, None),
       TaskResultLost,
       TaskKilled,
       ExecutorLostFailure("0"),

http://git-wip-us.apache.org/repos/asf/spark/blob/0579f28d/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index dde95f3..343a413 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -163,7 +163,8 @@ class JsonProtocolSuite extends SparkFunSuite {
   }
 
   test("ExceptionFailure backward compatibility") {
-    val exceptionFailure = ExceptionFailure("To be", "or not to be", 
stackTrace, null, None)
+    val exceptionFailure = ExceptionFailure("To be", "or not to be", 
stackTrace, null,
+      None, None)
     val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
       .removeField({ _._1 == "Full Stack Trace" })
     assertEquals(exceptionFailure, 
JsonProtocol.taskEndReasonFromJson(oldEvent))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to