Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d92568ae5 -> ff3e9561d


[SPARK-9809] Task crashes because the internal accumulators are not properly 
initialized

When a stage failed and another stage was resubmitted with only part of 
partitions to compute, all the tasks failed with error message: 
java.util.NoSuchElementException: key not found: peakExecutionMemory.
This is because the internal accumulators are not properly initialized for this 
stage while other codes assume the internal accumulators always exist.

Author: Carson Wang <carson.w...@intel.com>

Closes #8090 from carsonwang/SPARK-9809.

(cherry picked from commit 33bae585d4cb25aed2ac32e0d1248f78cc65318b)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff3e9561
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff3e9561
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff3e9561

Branch: refs/heads/branch-1.5
Commit: ff3e9561d63348076b77b3d16ca1a720461e87ea
Parents: d92568a
Author: Carson Wang <carson.w...@intel.com>
Authored: Fri Aug 14 13:38:25 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Aug 14 13:38:34 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff3e9561/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7ab5ccf..f1c63d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,9 +790,10 @@ class DAGScheduler(
       }
     }
 
+    // Create internal accumulators if the stage has no accumulators 
initialized.
     // Reset internal accumulators only if this stage is not partially 
submitted
     // Otherwise, we may override existing accumulator values from some tasks
-    if (allPartitions == partitionsToCompute) {
+    if (stage.internalAccumulators.isEmpty || allPartitions == 
partitionsToCompute) {
       stage.resetInternalAccumulators()
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to