This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new c3ba8fa69cbb [SPARK-48791][CORE][FOLLOW-UP][3.4] Fix regression caused by immutable conversion on TaskMetrics#externalAccums c3ba8fa69cbb is described below commit c3ba8fa69cbb88d5880a203f4b98c8bceb1c436b Author: Yi Wu <yi...@databricks.com> AuthorDate: Mon Aug 5 10:33:23 2024 -0700 [SPARK-48791][CORE][FOLLOW-UP][3.4] Fix regression caused by immutable conversion on TaskMetrics#externalAccums This PR backports https://github.com/apache/spark/pull/47578 to branch-3.4. ### What changes were proposed in this pull request? This is a followup fix for https://github.com/apache/spark/pull/47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (https://github.com/apache/spark/pull/40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47613 from Ngone51/SPARK-48791-followup-3.4. Authored-by: Yi Wu <yi...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/util/ArrayImplicits.scala | 35 ---------------------- .../org/apache/spark/executor/TaskMetrics.scala | 19 ++++++++---- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- .../parquet/SpecificParquetRecordReaderBase.java | 15 ++++++---- .../sql/execution/ui/SQLAppStatusListener.scala | 4 +-- .../sql/execution/metric/SQLMetricsTestUtils.scala | 4 +-- 6 files changed, 28 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala b/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala deleted file mode 100644 index 82c6c75bd51a..000000000000 --- a/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.collection.compat.immutable - -/** - * Implicit methods related to Scala Array. - */ -private[spark] object ArrayImplicits { - - implicit class SparkArrayOps[T](xs: Array[T]) { - - /** - * Wraps an Array[T] as an immutable.ArraySeq[T] without copying. - */ - def toImmutableArraySeq: immutable.ArraySeq[T] = - immutable.ArraySeq.unsafeWrapArray(xs) - } -} diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 468969b8d332..e88b70eb655c 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -29,8 +29,6 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.util._ -import org.apache.spark.util.ArrayImplicits._ - /** * :: DeveloperApi :: @@ -272,8 +270,17 @@ class TaskMetrics private[spark] () extends Serializable { */ @transient private[spark] lazy val _externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] - private[spark] def externalAccums: Seq[AccumulatorV2[_, _]] = withReadLock { - _externalAccums.toArray.toImmutableArraySeq + /** + * Perform an `op` conversion on the `_externalAccums` within the read lock. + * + * Note `op` is expected to not modify the `_externalAccums` and not being + * lazy evaluation for safe concern since `ArrayBuffer` is lazily evaluated. + * And we intentionally keeps `_externalAccums` as mutable instead of converting + * it to immutable for the performance concern. + */ + private[spark] def withExternalAccums[T](op: ArrayBuffer[AccumulatorV2[_, _]] => T) + : T = withReadLock { + op(_externalAccums) } private def withReadLock[B](fn: => B): B = { @@ -298,7 +305,9 @@ class TaskMetrics private[spark] () extends Serializable { _externalAccums += a } - private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = withReadLock { + internalAccums ++ _externalAccums + } private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = { // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 001e3220e73b..4f011cf9a3dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -208,7 +208,7 @@ private[spark] abstract class Task[T]( context.taskMetrics.nonZeroInternalAccums() ++ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not // filter them out. - context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues) + context.taskMetrics.withExternalAccums(_.filter(a => !taskFailed || a.countFailedValues)) } else { Seq.empty } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 678b287a5e3a..4aa5ebceec11 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -129,12 +129,15 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo // in test case. TaskContext taskContext = TaskContext$.MODULE$.get(); if (taskContext != null) { - Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption(); - if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { - @SuppressWarnings("unchecked") - AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get(); - intAccum.add(fileReader.getRowGroups().size()); - } + taskContext.taskMetrics().withExternalAccums((accums) -> { + Option<AccumulatorV2<?, ?>> accu = accums.lastOption(); + if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { + @SuppressWarnings("unchecked") + AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get(); + intAccum.add(fileReader.getRowGroups().size()); + } + return null; + }); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 7b9f877bdef5..cc92c6c08bf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -178,14 +178,14 @@ class SQLAppStatusListener( // work around a race in the DAGScheduler. The metrics info does not contain accumulator info // when reading event logs in the SHS, so we have to rely on the accumulator in that case. val accums = if (live && event.taskMetrics != null) { - event.taskMetrics.externalAccums.flatMap { a => + event.taskMetrics.withExternalAccums(_.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { Some(a.toInfo(Some(a.value), None)) } catch { case _: IllegalAccessError => None } - } + }) } else { info.accumulables } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 81667d52e16a..46dc84c0582f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -311,7 +311,7 @@ object InputOutputMetricsHelper { res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead var maxOutputRows = 0L - for (accum <- taskEnd.taskMetrics.externalAccums) { + taskEnd.taskMetrics.withExternalAccums(_.foreach { accum => val info = accum.toInfo(Some(accum.value), None) if (info.name.toString.contains("number of output rows")) { info.update match { @@ -322,7 +322,7 @@ object InputOutputMetricsHelper { case _ => // Ignore. } } - } + }) res.sumMaxOutputRows += maxOutputRows } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org