This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5250ff3dbd7e [SPARK-48791][CORE][FOLLOW-UP] Fix regression caused by 
immutable conversion on TaskMetrics#externalAccums
5250ff3dbd7e is described below

commit 5250ff3dbd7ef5903f4d4af5444f2d31d323f00b
Author: Yi Wu <yi...@databricks.com>
AuthorDate: Mon Aug 5 20:39:29 2024 +0800

    [SPARK-48791][CORE][FOLLOW-UP] Fix regression caused by immutable 
conversion on TaskMetrics#externalAccums
    
    ### What changes were proposed in this pull request?
    
    This is a followup fix for https://github.com/apache/spark/pull/47197. We 
found that the perf regression still exists after that fix and located the 
culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR 
fixes it by avoiding the immutable conversion, and then enforce the read lock 
protection during the accessing on `TaskMetrics#externalAccums` to avoid the 
race issue (https://github.com/apache/spark/pull/40663).
    
    ### Why are the changes needed?
    
    Fix perf regression.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Covered by existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47578 from Ngone51/SPARK-48791-followup.
    
    Lead-authored-by: Yi Wu <yi...@databricks.com>
    Co-authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../scala/org/apache/spark/executor/TaskMetrics.scala | 19 ++++++++++++++-----
 .../main/scala/org/apache/spark/scheduler/Task.scala  |  2 +-
 .../parquet/SpecificParquetRecordReaderBase.java      | 15 +++++++++------
 .../spark/sql/execution/ui/SQLAppStatusListener.scala |  4 ++--
 .../sql/execution/metric/SQLMetricsTestUtils.scala    |  4 ++--
 5 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 227e7d84654d..8540e8c330a0 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -29,8 +29,6 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.util._
-import org.apache.spark.util.ArrayImplicits._
-
 
 /**
  * :: DeveloperApi ::
@@ -272,8 +270,17 @@ class TaskMetrics private[spark] () extends Serializable {
    */
   @transient private[spark] lazy val _externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
-  private[spark] def externalAccums: Seq[AccumulatorV2[_, _]] = withReadLock {
-    _externalAccums.toArray.toImmutableArraySeq
+  /**
+   * Perform an `op` conversion on the `_externalAccums` within the read lock.
+   *
+   * Note `op` is expected to not modify the `_externalAccums` and not being
+   * lazy evaluation for safe concern since `ArrayBuffer` is lazily evaluated.
+   * And we intentionally keeps `_externalAccums` as mutable instead of 
converting
+   * it to immutable for the performance concern.
+   */
+  private[spark] def withExternalAccums[T](op: ArrayBuffer[AccumulatorV2[_, 
_]] => T)
+    : T = withReadLock {
+    op(_externalAccums)
   }
 
   private def withReadLock[B](fn: => B): B = {
@@ -298,7 +305,9 @@ class TaskMetrics private[spark] () extends Serializable {
     _externalAccums += a
   }
 
-  private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums 
++ externalAccums
+  private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = withReadLock {
+    internalAccums ++ _externalAccums
+  }
 
   private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = {
     // RESULT_SIZE accumulator is always zero at executor, we need to send it 
back as its
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 6e449e4dc111..f511aed6d216 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -215,7 +215,7 @@ private[spark] abstract class Task[T](
       context.taskMetrics().nonZeroInternalAccums() ++
         // zero value external accumulators may still be useful, e.g. 
SQLMetrics, we should not
         // filter them out.
-        context.taskMetrics().externalAccums.filter(a => !taskFailed || 
a.countFailedValues)
+        context.taskMetrics().withExternalAccums(_.filter(a => !taskFailed || 
a.countFailedValues))
     } else {
       Seq.empty
     }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 6d00048154a5..d3716ef18447 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -139,12 +139,15 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     // in test case.
     TaskContext taskContext = TaskContext$.MODULE$.get();
     if (taskContext != null) {
-      Option<AccumulatorV2<?, ?>> accu = 
taskContext.taskMetrics().externalAccums().lastOption();
-      if (accu.isDefined() && 
accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
-        @SuppressWarnings("unchecked")
-        AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, 
Integer>) accu.get();
-        intAccum.add(fileReader.getRowGroups().size());
-      }
+      taskContext.taskMetrics().withExternalAccums((accums) -> {
+        Option<AccumulatorV2<?, ?>> accu = accums.lastOption();
+        if (accu.isDefined() && 
accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
+          @SuppressWarnings("unchecked")
+          AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, 
Integer>) accu.get();
+          intAccum.add(fileReader.getRowGroups().size());
+        }
+        return null;
+      });
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index dcbf328c71e3..fdd38595306a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -179,14 +179,14 @@ class SQLAppStatusListener(
     // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
     // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
     val accums = if (live && event.taskMetrics != null) {
-      event.taskMetrics.externalAccums.flatMap { a =>
+      event.taskMetrics.withExternalAccums(_.flatMap { a =>
         // This call may fail if the accumulator is gc'ed, so account for that.
         try {
           Some(a.toInfo(Some(a.value), None))
         } catch {
           case _: IllegalAccessError => None
         }
-      }
+      })
     } else {
       info.accumulables
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 72aa607591d5..2c5750962237 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -311,7 +311,7 @@ object InputOutputMetricsHelper {
       res.shuffleRecordsRead += 
taskEnd.taskMetrics.shuffleReadMetrics.recordsRead
 
       var maxOutputRows = 0L
-      for (accum <- taskEnd.taskMetrics.externalAccums) {
+      taskEnd.taskMetrics.withExternalAccums(_.foreach { accum =>
         val info = accum.toInfo(Some(accum.value), None)
         if (info.name.toString.contains("number of output rows")) {
           info.update match {
@@ -322,7 +322,7 @@ object InputOutputMetricsHelper {
             case _ => // Ignore.
           }
         }
-      }
+      })
       res.sumMaxOutputRows += maxOutputRows
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to