This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5250ff3dbd7e [SPARK-48791][CORE][FOLLOW-UP] Fix regression caused by immutable conversion on TaskMetrics#externalAccums 5250ff3dbd7e is described below commit 5250ff3dbd7ef5903f4d4af5444f2d31d323f00b Author: Yi Wu <yi...@databricks.com> AuthorDate: Mon Aug 5 20:39:29 2024 +0800 [SPARK-48791][CORE][FOLLOW-UP] Fix regression caused by immutable conversion on TaskMetrics#externalAccums ### What changes were proposed in this pull request? This is a followup fix for https://github.com/apache/spark/pull/47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (https://github.com/apache/spark/pull/40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <yi...@databricks.com> Co-authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../scala/org/apache/spark/executor/TaskMetrics.scala | 19 ++++++++++++++----- .../main/scala/org/apache/spark/scheduler/Task.scala | 2 +- .../parquet/SpecificParquetRecordReaderBase.java | 15 +++++++++------ .../spark/sql/execution/ui/SQLAppStatusListener.scala | 4 ++-- .../sql/execution/metric/SQLMetricsTestUtils.scala | 4 ++-- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 227e7d84654d..8540e8c330a0 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -29,8 +29,6 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.util._ -import org.apache.spark.util.ArrayImplicits._ - /** * :: DeveloperApi :: @@ -272,8 +270,17 @@ class TaskMetrics private[spark] () extends Serializable { */ @transient private[spark] lazy val _externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] - private[spark] def externalAccums: Seq[AccumulatorV2[_, _]] = withReadLock { - _externalAccums.toArray.toImmutableArraySeq + /** + * Perform an `op` conversion on the `_externalAccums` within the read lock. + * + * Note `op` is expected to not modify the `_externalAccums` and not being + * lazy evaluation for safe concern since `ArrayBuffer` is lazily evaluated. + * And we intentionally keeps `_externalAccums` as mutable instead of converting + * it to immutable for the performance concern. + */ + private[spark] def withExternalAccums[T](op: ArrayBuffer[AccumulatorV2[_, _]] => T) + : T = withReadLock { + op(_externalAccums) } private def withReadLock[B](fn: => B): B = { @@ -298,7 +305,9 @@ class TaskMetrics private[spark] () extends Serializable { _externalAccums += a } - private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = withReadLock { + internalAccums ++ _externalAccums + } private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = { // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 6e449e4dc111..f511aed6d216 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -215,7 +215,7 @@ private[spark] abstract class Task[T]( context.taskMetrics().nonZeroInternalAccums() ++ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not // filter them out. - context.taskMetrics().externalAccums.filter(a => !taskFailed || a.countFailedValues) + context.taskMetrics().withExternalAccums(_.filter(a => !taskFailed || a.countFailedValues)) } else { Seq.empty } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 6d00048154a5..d3716ef18447 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -139,12 +139,15 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo // in test case. TaskContext taskContext = TaskContext$.MODULE$.get(); if (taskContext != null) { - Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption(); - if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { - @SuppressWarnings("unchecked") - AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get(); - intAccum.add(fileReader.getRowGroups().size()); - } + taskContext.taskMetrics().withExternalAccums((accums) -> { + Option<AccumulatorV2<?, ?>> accu = accums.lastOption(); + if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { + @SuppressWarnings("unchecked") + AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get(); + intAccum.add(fileReader.getRowGroups().size()); + } + return null; + }); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index dcbf328c71e3..fdd38595306a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -179,14 +179,14 @@ class SQLAppStatusListener( // work around a race in the DAGScheduler. The metrics info does not contain accumulator info // when reading event logs in the SHS, so we have to rely on the accumulator in that case. val accums = if (live && event.taskMetrics != null) { - event.taskMetrics.externalAccums.flatMap { a => + event.taskMetrics.withExternalAccums(_.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { Some(a.toInfo(Some(a.value), None)) } catch { case _: IllegalAccessError => None } - } + }) } else { info.accumulables } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 72aa607591d5..2c5750962237 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -311,7 +311,7 @@ object InputOutputMetricsHelper { res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead var maxOutputRows = 0L - for (accum <- taskEnd.taskMetrics.externalAccums) { + taskEnd.taskMetrics.withExternalAccums(_.foreach { accum => val info = accum.toInfo(Some(accum.value), None) if (info.name.toString.contains("number of output rows")) { info.update match { @@ -322,7 +322,7 @@ object InputOutputMetricsHelper { case _ => // Ignore. } } - } + }) res.sumMaxOutputRows += maxOutputRows } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org