Repository: spark
Updated Branches:
  refs/heads/master ffa05c84f -> 33bae585d


[SPARK-9809] Task crashes because the internal accumulators are not properly 
initialized

When a stage failed and another stage was resubmitted with only part of 
partitions to compute, all the tasks failed with error message: 
java.util.NoSuchElementException: key not found: peakExecutionMemory.
This is because the internal accumulators are not properly initialized for this 
stage while other codes assume the internal accumulators always exist.

Author: Carson Wang <carson.w...@intel.com>

Closes #8090 from carsonwang/SPARK-9809.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33bae585
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33bae585
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33bae585

Branch: refs/heads/master
Commit: 33bae585d4cb25aed2ac32e0d1248f78cc65318b
Parents: ffa05c8
Author: Carson Wang <carson.w...@intel.com>
Authored: Fri Aug 14 13:38:25 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Aug 14 13:38:25 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33bae585/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7ab5ccf..f1c63d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,9 +790,10 @@ class DAGScheduler(
       }
     }
 
+    // Create internal accumulators if the stage has no accumulators 
initialized.
     // Reset internal accumulators only if this stage is not partially 
submitted
     // Otherwise, we may override existing accumulator values from some tasks
-    if (allPartitions == partitionsToCompute) {
+    if (stage.internalAccumulators.isEmpty || allPartitions == 
partitionsToCompute) {
       stage.resetInternalAccumulators()
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to