This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 744b070fa964 [SPARK-48391][CORE] Using addAll instead of add function 
in fromAccumulatorInfos method of TaskMetrics Class
744b070fa964 is described below

commit 744b070fa964dee9e5460a24f88f22c3af8170dc
Author: Dereck Li <monkeyboy....@gmail.com>
AuthorDate: Fri May 31 15:56:05 2024 -0700

    [SPARK-48391][CORE] Using addAll instead of add function in 
fromAccumulatorInfos method of TaskMetrics Class
    
    ### What changes were proposed in this pull request?
    
    Using addAll instead of add function in fromAccumulators method of 
TaskMetrics.
    
    ### Why are the changes needed?
    
    To Improve performance. In the fromAccumulators method of TaskMetrics,we 
should use `
    tm._externalAccums.addAll` instead of `tm._externalAccums.add`, as 
_externalAccums is a instance of CopyOnWriteArrayList
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes.
    
    ### How was this patch tested?
    
    No Tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46705 from monkeyboy123/fromAccumulators-accelerate.
    
    Authored-by: Dereck Li <monkeyboy....@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 3cd35f8cb6462051c621cf49de54b9c5692aae1d)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 78b39b0cbda6..d446104cb642 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -328,16 +328,19 @@ private[spark] object TaskMetrics extends Logging {
    */
   def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = {
     val tm = new TaskMetrics
+    val externalAccums = new java.util.ArrayList[AccumulatorV2[Any, Any]]()
     for (acc <- accums) {
       val name = acc.name
+      val tmpAcc = acc.asInstanceOf[AccumulatorV2[Any, Any]]
       if (name.isDefined && tm.nameToAccums.contains(name.get)) {
         val tmAcc = tm.nameToAccums(name.get).asInstanceOf[AccumulatorV2[Any, 
Any]]
         tmAcc.metadata = acc.metadata
-        tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
+        tmAcc.merge(tmpAcc)
       } else {
-        tm._externalAccums.add(acc)
+        externalAccums.add(tmpAcc)
       }
     }
+    tm._externalAccums.addAll(externalAccums)
     tm
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to