Repository: spark
Updated Branches:
  refs/heads/master 9fd82dbbc -> 05bf4e4af


[SPARK-2323] Exception in accumulator update should not crash DAGScheduler & 
SparkContext

Author: Reynold Xin <r...@apache.org>

Closes #1772 from rxin/accumulator-dagscheduler and squashes the following 
commits:

6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not 
crash DAGScheduler & SparkContext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05bf4e4a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05bf4e4a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05bf4e4a

Branch: refs/heads/master
Commit: 05bf4e4aff0d052a53d3e64c43688f07e27fec50
Parents: 9fd82db
Author: Reynold Xin <r...@apache.org>
Authored: Mon Aug 4 20:39:18 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Mon Aug 4 20:39:18 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala  |  9 +++++++--
 .../org/apache/spark/scheduler/DAGSchedulerSuite.scala   | 11 +++--------
 2 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05bf4e4a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d87c304..9fa3a4e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -904,8 +904,13 @@ class DAGScheduler(
     event.reason match {
       case Success =>
         if (event.accumUpdates != null) {
-          // TODO: fail the stage if the accumulator update fails...
-          Accumulators.add(event.accumUpdates) // TODO: do this only if task 
wasn't resubmitted
+          try {
+            Accumulators.add(event.accumUpdates)
+          } catch {
+            // If we see an exception during accumulator update, just log the 
error and move on.
+            case e: Exception =>
+              logError(s"Failed to update accumulators for $task", e)
+          }
         }
         stage.pendingTasks -= task
         task match {

http://git-wip-us.apache.org/repos/asf/spark/blob/05bf4e4a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 36e238b..8c1b0fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -622,8 +622,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
-  // TODO: Fix this and un-ignore the test.
-  ignore("misbehaved accumulator should not crash DAGScheduler and 
SparkContext") {
+  test("misbehaved accumulator should not crash DAGScheduler and 
SparkContext") {
     val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
       override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
       override def zero(initialValue: Int): Int = 0
@@ -633,14 +632,10 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     })
 
     // Run this on executors
-    intercept[SparkDriverExecutionException] {
-      sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
-    }
+    sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
 
     // Run this within a local thread
-    intercept[SparkDriverExecutionException] {
-      sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
-    }
+    sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
 
     // Make sure we can still run local commands as well as cluster commands.
     assert(sc.parallelize(1 to 10, 2).count() === 10)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to