Repository: spark Updated Branches: refs/heads/branch-1.1 4245404e8 -> e69deb818
[SPARK-3465] fix task metrics aggregation in local mode Before overwrite t.taskMetrics, take a deepcopy of it. Author: Davies Liu <davies....@gmail.com> Closes #2338 from davies/fix_metric and squashes the following commits: a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric 7c879e0 [Davies Liu] add more comments 754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true 5ca26dc [Davies Liu] fix task metrics aggregation in local mode Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e69deb81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e69deb81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e69deb81 Branch: refs/heads/branch-1.1 Commit: e69deb81842639ee089b518e994080e27a343297 Parents: 4245404 Author: Davies Liu <davies....@gmail.com> Authored: Thu Sep 11 18:53:26 2014 -0700 Committer: Andrew Or <andrewo...@gmail.com> Committed: Thu Sep 11 18:56:34 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/executor/Executor.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e69deb81/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2f76e53..640d7bb 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -353,7 +353,16 @@ private[spark] class Executor( if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics - tasksMetrics += ((taskRunner.taskId, metrics)) + if (isLocal) { + // JobProgressListener will hold an reference of it during + // onExecutorMetricsUpdate(), then JobProgressListener can not see + // the changes of metrics any more, so make a deep copy of it + val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) + tasksMetrics += ((taskRunner.taskId, copiedMetrics)) + } else { + // It will be copied by serialization + tasksMetrics += ((taskRunner.taskId, metrics)) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org