This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 744b070fa964 [SPARK-48391][CORE] Using addAll instead of add function in fromAccumulatorInfos method of TaskMetrics Class 744b070fa964 is described below commit 744b070fa964dee9e5460a24f88f22c3af8170dc Author: Dereck Li <monkeyboy....@gmail.com> AuthorDate: Fri May 31 15:56:05 2024 -0700 [SPARK-48391][CORE] Using addAll instead of add function in fromAccumulatorInfos method of TaskMetrics Class ### What changes were proposed in this pull request? Using addAll instead of add function in fromAccumulators method of TaskMetrics. ### Why are the changes needed? To Improve performance. In the fromAccumulators method of TaskMetrics,we should use ` tm._externalAccums.addAll` instead of `tm._externalAccums.add`, as _externalAccums is a instance of CopyOnWriteArrayList ### Does this PR introduce _any_ user-facing change? yes. ### How was this patch tested? No Tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46705 from monkeyboy123/fromAccumulators-accelerate. Authored-by: Dereck Li <monkeyboy....@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 3cd35f8cb6462051c621cf49de54b9c5692aae1d) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 78b39b0cbda6..d446104cb642 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -328,16 +328,19 @@ private[spark] object TaskMetrics extends Logging { */ def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = { val tm = new TaskMetrics + val externalAccums = new java.util.ArrayList[AccumulatorV2[Any, Any]]() for (acc <- accums) { val name = acc.name + val tmpAcc = acc.asInstanceOf[AccumulatorV2[Any, Any]] if (name.isDefined && tm.nameToAccums.contains(name.get)) { val tmAcc = tm.nameToAccums(name.get).asInstanceOf[AccumulatorV2[Any, Any]] tmAcc.metadata = acc.metadata - tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) + tmAcc.merge(tmpAcc) } else { - tm._externalAccums.add(acc) + externalAccums.add(tmpAcc) } } + tm._externalAccums.addAll(externalAccums) tm } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org