This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new f07a54743c57 [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList f07a54743c57 is described below commit f07a54743c574bf703b406d0deba0b6e21a54273 Author: Yi Wu <yi...@databricks.com> AuthorDate: Thu Jul 18 15:37:20 2024 +0800 [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList This PR backports https://github.com/apache/spark/pull/47197 to branch-3.5. ### What changes were proposed in this pull request? This PR proposes to use the `ArrayBuffer` together with the read/write lock rather than `CopyOnWriteArrayList` for `TaskMetrics._externalAccums`. ### Why are the changes needed? Fix the perf regression that caused by the accumulators registration overhead using `CopyOnWriteArrayList`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47297 from Ngone51/SPARK-48791-3.5. Authored-by: Yi Wu <yi...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/pom.xml | 4 ++ .../org/apache/spark/util/ArrayImplicits.scala | 35 ++++++++++++++++++ .../org/apache/spark/util/ArrayImplicits.scala | 35 ++++++++++++++++++ .../org/apache/spark/executor/TaskMetrics.scala | 43 +++++++++++++++++----- mllib-local/pom.xml | 12 ++++++ mllib/pom.xml | 10 +++++ pom.xml | 6 +++ 7 files changed, 135 insertions(+), 10 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index a3a02cbd1c56..a326e41b8e23 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -243,6 +243,10 @@ <groupId>org.scala-lang.modules</groupId> <artifactId>scala-xml_${scala.binary.version}</artifactId> </dependency> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-collection-compat_${scala.binary.version}</artifactId> + </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> diff --git a/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala b/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala new file mode 100644 index 000000000000..82c6c75bd51a --- /dev/null +++ b/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.collection.compat.immutable + +/** + * Implicit methods related to Scala Array. + */ +private[spark] object ArrayImplicits { + + implicit class SparkArrayOps[T](xs: Array[T]) { + + /** + * Wraps an Array[T] as an immutable.ArraySeq[T] without copying. + */ + def toImmutableArraySeq: immutable.ArraySeq[T] = + immutable.ArraySeq.unsafeWrapArray(xs) + } +} diff --git a/core/src/main/scala-2.13/org/apache/spark/util/ArrayImplicits.scala b/core/src/main/scala-2.13/org/apache/spark/util/ArrayImplicits.scala new file mode 100644 index 000000000000..38c2a415af3d --- /dev/null +++ b/core/src/main/scala-2.13/org/apache/spark/util/ArrayImplicits.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.collection.immutable + +/** + * Implicit methods related to Scala Array. + */ +private[spark] object ArrayImplicits { + + implicit class SparkArrayOps[T](xs: Array[T]) { + + /** + * Wraps an Array[T] as an immutable.ArraySeq[T] without copying. + */ + def toImmutableArraySeq: immutable.ArraySeq[T] = + immutable.ArraySeq.unsafeWrapArray(xs) + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d446104cb642..468969b8d332 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} @@ -29,6 +29,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.util._ +import org.apache.spark.util.ArrayImplicits._ /** @@ -150,6 +151,11 @@ class TaskMetrics private[spark] () extends Serializable { private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = _updatedBlockStatuses.setValue(v.asJava) + private val (readLock, writeLock) = { + val lock = new ReentrantReadWriteLock() + (lock.readLock(), lock.writeLock()) + } + /** * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted * data, defined only in tasks with input. @@ -264,12 +270,32 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]] + @transient private[spark] lazy val _externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] - private[spark] def externalAccums = _externalAccums.asScala + private[spark] def externalAccums: Seq[AccumulatorV2[_, _]] = withReadLock { + _externalAccums.toArray.toImmutableArraySeq + } + + private def withReadLock[B](fn: => B): B = { + readLock.lock() + try { + fn + } finally { + readLock.unlock() + } + } + + private def withWriteLock[B](fn: => B): B = { + writeLock.lock() + try { + fn + } finally { + writeLock.unlock() + } + } - private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { - _externalAccums.add(a) + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = withWriteLock { + _externalAccums += a } private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums @@ -328,19 +354,16 @@ private[spark] object TaskMetrics extends Logging { */ def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = { val tm = new TaskMetrics - val externalAccums = new java.util.ArrayList[AccumulatorV2[Any, Any]]() for (acc <- accums) { val name = acc.name - val tmpAcc = acc.asInstanceOf[AccumulatorV2[Any, Any]] if (name.isDefined && tm.nameToAccums.contains(name.get)) { val tmAcc = tm.nameToAccums(name.get).asInstanceOf[AccumulatorV2[Any, Any]] tmAcc.metadata = acc.metadata - tmAcc.merge(tmpAcc) + tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) } else { - externalAccums.add(tmpAcc) + tm._externalAccums += acc } } - tm._externalAccums.addAll(externalAccums) tm } } diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml index 9ef674c93081..4e00a7b2dc9b 100644 --- a/mllib-local/pom.xml +++ b/mllib-local/pom.xml @@ -37,7 +37,19 @@ <dependency> <groupId>org.scalanlp</groupId> <artifactId>breeze_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-collection-compat_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> </dependency> + <!-- #if scala-2.13 --><!-- + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-collection-compat_${scala.binary.version}</artifactId> + </dependency> + --><!-- #endif scala-2.13 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> diff --git a/mllib/pom.xml b/mllib/pom.xml index f6640e470596..b9e6e4058346 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -96,10 +96,20 @@ <groupId>org.scala-lang.modules</groupId> <artifactId>scala-parallel-collections_${scala.binary.version}</artifactId> </dependency> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-collection-compat_${scala.binary.version}</artifactId> + </dependency> --><!-- #endif scala-2.13 --> <dependency> <groupId>org.scalanlp</groupId> <artifactId>breeze_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-collection-compat_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.commons</groupId> diff --git a/pom.xml b/pom.xml index be910d24193d..57bae938891d 100644 --- a/pom.xml +++ b/pom.xml @@ -174,6 +174,7 @@ <commons.collections4.version>4.4</commons.collections4.version> <scala.version>2.12.18</scala.version> <scala.binary.version>2.12</scala.binary.version> + <scala-collection-compat.version>2.7.0</scala-collection-compat.version> <scalatest-maven-plugin.version>2.2.0</scalatest-maven-plugin.version> <!-- dont update scala-maven-plugin to version 4.8.1 SPARK-42809 and SPARK-43595 --> <scala-maven-plugin.version>4.8.0</scala-maven-plugin.version> @@ -1102,6 +1103,11 @@ <artifactId>scala-xml_${scala.binary.version}</artifactId> <version>2.1.0</version> </dependency> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-collection-compat_${scala.binary.version}</artifactId> + <version>${scala-collection-compat.version}</version> + </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org