This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new f07a54743c57 [SPARK-48791][CORE][3.5] Fix perf regression caused by 
the accumulators registration overhead using CopyOnWriteArrayList
f07a54743c57 is described below

commit f07a54743c574bf703b406d0deba0b6e21a54273
Author: Yi Wu <yi...@databricks.com>
AuthorDate: Thu Jul 18 15:37:20 2024 +0800

    [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators 
registration overhead using CopyOnWriteArrayList
    
    This PR backports https://github.com/apache/spark/pull/47197 to branch-3.5.
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use the `ArrayBuffer` together with the read/write lock 
rather than `CopyOnWriteArrayList` for `TaskMetrics._externalAccums`.
    
    ### Why are the changes needed?
    
    Fix the perf regression that caused by the accumulators registration 
overhead using `CopyOnWriteArrayList`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manually tested.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47297 from Ngone51/SPARK-48791-3.5.
    
    Authored-by: Yi Wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/pom.xml                                       |  4 ++
 .../org/apache/spark/util/ArrayImplicits.scala     | 35 ++++++++++++++++++
 .../org/apache/spark/util/ArrayImplicits.scala     | 35 ++++++++++++++++++
 .../org/apache/spark/executor/TaskMetrics.scala    | 43 +++++++++++++++++-----
 mllib-local/pom.xml                                | 12 ++++++
 mllib/pom.xml                                      | 10 +++++
 pom.xml                                            |  6 +++
 7 files changed, 135 insertions(+), 10 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index a3a02cbd1c56..a326e41b8e23 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -243,6 +243,10 @@
       <groupId>org.scala-lang.modules</groupId>
       <artifactId>scala-xml_${scala.binary.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang.modules</groupId>
+      <artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
diff --git 
a/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala 
b/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala
new file mode 100644
index 000000000000..82c6c75bd51a
--- /dev/null
+++ b/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.collection.compat.immutable
+
+/**
+ * Implicit methods related to Scala Array.
+ */
+private[spark] object ArrayImplicits {
+
+  implicit class SparkArrayOps[T](xs: Array[T]) {
+
+    /**
+     * Wraps an Array[T] as an immutable.ArraySeq[T] without copying.
+     */
+    def toImmutableArraySeq: immutable.ArraySeq[T] =
+      immutable.ArraySeq.unsafeWrapArray(xs)
+  }
+}
diff --git 
a/core/src/main/scala-2.13/org/apache/spark/util/ArrayImplicits.scala 
b/core/src/main/scala-2.13/org/apache/spark/util/ArrayImplicits.scala
new file mode 100644
index 000000000000..38c2a415af3d
--- /dev/null
+++ b/core/src/main/scala-2.13/org/apache/spark/util/ArrayImplicits.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.collection.immutable
+
+/**
+ * Implicit methods related to Scala Array.
+ */
+private[spark] object ArrayImplicits {
+
+  implicit class SparkArrayOps[T](xs: Array[T]) {
+
+    /**
+     * Wraps an Array[T] as an immutable.ArraySeq[T] without copying.
+     */
+    def toImmutableArraySeq: immutable.ArraySeq[T] =
+      immutable.ArraySeq.unsafeWrapArray(xs)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index d446104cb642..468969b8d332 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
@@ -29,6 +29,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.util._
+import org.apache.spark.util.ArrayImplicits._
 
 
 /**
@@ -150,6 +151,11 @@ class TaskMetrics private[spark] () extends Serializable {
   private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): 
Unit =
     _updatedBlockStatuses.setValue(v.asJava)
 
+  private val (readLock, writeLock) = {
+    val lock = new ReentrantReadWriteLock()
+    (lock.readLock(), lock.writeLock())
+  }
+
   /**
    * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] 
or from persisted
    * data, defined only in tasks with input.
@@ -264,12 +270,32 @@ class TaskMetrics private[spark] () extends Serializable {
   /**
    * External accumulators registered with this task.
    */
-  @transient private[spark] lazy val _externalAccums = new 
CopyOnWriteArrayList[AccumulatorV2[_, _]]
+  @transient private[spark] lazy val _externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
-  private[spark] def externalAccums = _externalAccums.asScala
+  private[spark] def externalAccums: Seq[AccumulatorV2[_, _]] = withReadLock {
+    _externalAccums.toArray.toImmutableArraySeq
+  }
+
+  private def withReadLock[B](fn: => B): B = {
+    readLock.lock()
+    try {
+      fn
+    } finally {
+      readLock.unlock()
+    }
+  }
+
+  private def withWriteLock[B](fn: => B): B = {
+    writeLock.lock()
+    try {
+      fn
+    } finally {
+      writeLock.unlock()
+    }
+  }
 
-  private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
-    _externalAccums.add(a)
+  private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = 
withWriteLock {
+    _externalAccums += a
   }
 
   private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums 
++ externalAccums
@@ -328,19 +354,16 @@ private[spark] object TaskMetrics extends Logging {
    */
   def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = {
     val tm = new TaskMetrics
-    val externalAccums = new java.util.ArrayList[AccumulatorV2[Any, Any]]()
     for (acc <- accums) {
       val name = acc.name
-      val tmpAcc = acc.asInstanceOf[AccumulatorV2[Any, Any]]
       if (name.isDefined && tm.nameToAccums.contains(name.get)) {
         val tmAcc = tm.nameToAccums(name.get).asInstanceOf[AccumulatorV2[Any, 
Any]]
         tmAcc.metadata = acc.metadata
-        tmAcc.merge(tmpAcc)
+        tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
       } else {
-        externalAccums.add(tmpAcc)
+        tm._externalAccums += acc
       }
     }
-    tm._externalAccums.addAll(externalAccums)
     tm
   }
 }
diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml
index 9ef674c93081..4e00a7b2dc9b 100644
--- a/mllib-local/pom.xml
+++ b/mllib-local/pom.xml
@@ -37,7 +37,19 @@
     <dependency>
       <groupId>org.scalanlp</groupId>
       <artifactId>breeze_${scala.binary.version}</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang.modules</groupId>
+          
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
+    <!-- #if scala-2.13 --><!--
+    <dependency>
+      <groupId>org.scala-lang.modules</groupId>
+      <artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
+    </dependency>
+    --><!-- #endif scala-2.13 -->
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math3</artifactId>
diff --git a/mllib/pom.xml b/mllib/pom.xml
index f6640e470596..b9e6e4058346 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -96,10 +96,20 @@
       <groupId>org.scala-lang.modules</groupId>
       
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang.modules</groupId>
+      <artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
+    </dependency>
     --><!-- #endif scala-2.13 -->
     <dependency>
       <groupId>org.scalanlp</groupId>
       <artifactId>breeze_${scala.binary.version}</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang.modules</groupId>
+          
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
diff --git a/pom.xml b/pom.xml
index be910d24193d..57bae938891d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,7 @@
     <commons.collections4.version>4.4</commons.collections4.version>
     <scala.version>2.12.18</scala.version>
     <scala.binary.version>2.12</scala.binary.version>
+    <scala-collection-compat.version>2.7.0</scala-collection-compat.version>
     <scalatest-maven-plugin.version>2.2.0</scalatest-maven-plugin.version>
     <!-- dont update scala-maven-plugin to version 4.8.1 SPARK-42809 and 
SPARK-43595 -->   
     <scala-maven-plugin.version>4.8.0</scala-maven-plugin.version>
@@ -1102,6 +1103,11 @@
         <artifactId>scala-xml_${scala.binary.version}</artifactId>
         <version>2.1.0</version>
       </dependency>
+      <dependency>
+        <groupId>org.scala-lang.modules</groupId>
+        
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
+        <version>${scala-collection-compat.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-compiler</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to