Repository: spark
Updated Branches:
  refs/heads/branch-1.1 4245404e8 -> e69deb818


[SPARK-3465] fix task metrics aggregation in local mode

Before overwrite t.taskMetrics, take a deepcopy of it.

Author: Davies Liu <davies....@gmail.com>

Closes #2338 from davies/fix_metric and squashes the following commits:

a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric
7c879e0 [Davies Liu] add more comments
754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true
5ca26dc [Davies Liu] fix task metrics aggregation in local mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e69deb81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e69deb81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e69deb81

Branch: refs/heads/branch-1.1
Commit: e69deb81842639ee089b518e994080e27a343297
Parents: 4245404
Author: Davies Liu <davies....@gmail.com>
Authored: Thu Sep 11 18:53:26 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Thu Sep 11 18:56:34 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/executor/Executor.scala  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e69deb81/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2f76e53..640d7bb 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -353,7 +353,16 @@ private[spark] class Executor(
             if (!taskRunner.attemptedTask.isEmpty) {
               Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
                 metrics.updateShuffleReadMetrics
-                tasksMetrics += ((taskRunner.taskId, metrics))
+                if (isLocal) {
+                  // JobProgressListener will hold an reference of it during
+                  // onExecutorMetricsUpdate(), then JobProgressListener can 
not see
+                  // the changes of metrics any more, so make a deep copy of it
+                  val copiedMetrics = 
Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
+                  tasksMetrics += ((taskRunner.taskId, copiedMetrics))
+                } else {
+                  // It will be copied by serialization
+                  tasksMetrics += ((taskRunner.taskId, metrics))
+                }
               }
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to