Repository: spark Updated Branches: refs/heads/branch-1.5 d92568ae5 -> ff3e9561d
[SPARK-9809] Task crashes because the internal accumulators are not properly initialized When a stage failed and another stage was resubmitted with only part of partitions to compute, all the tasks failed with error message: java.util.NoSuchElementException: key not found: peakExecutionMemory. This is because the internal accumulators are not properly initialized for this stage while other codes assume the internal accumulators always exist. Author: Carson Wang <carson.w...@intel.com> Closes #8090 from carsonwang/SPARK-9809. (cherry picked from commit 33bae585d4cb25aed2ac32e0d1248f78cc65318b) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff3e9561 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff3e9561 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff3e9561 Branch: refs/heads/branch-1.5 Commit: ff3e9561d63348076b77b3d16ca1a720461e87ea Parents: d92568a Author: Carson Wang <carson.w...@intel.com> Authored: Fri Aug 14 13:38:25 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Fri Aug 14 13:38:34 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ff3e9561/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7ab5ccf..f1c63d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -790,9 +790,10 @@ class DAGScheduler( } } + // Create internal accumulators if the stage has no accumulators initialized. // Reset internal accumulators only if this stage is not partially submitted // Otherwise, we may override existing accumulator values from some tasks - if (allPartitions == partitionsToCompute) { + if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) { stage.resetInternalAccumulators() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org