This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new c3ba8fa69cbb [SPARK-48791][CORE][FOLLOW-UP][3.4] Fix regression caused 
by immutable conversion on TaskMetrics#externalAccums
c3ba8fa69cbb is described below

commit c3ba8fa69cbb88d5880a203f4b98c8bceb1c436b
Author: Yi Wu <yi...@databricks.com>
AuthorDate: Mon Aug 5 10:33:23 2024 -0700

    [SPARK-48791][CORE][FOLLOW-UP][3.4] Fix regression caused by immutable 
conversion on TaskMetrics#externalAccums
    
    This PR backports https://github.com/apache/spark/pull/47578 to branch-3.4.
    
    ### What changes were proposed in this pull request?
    
    This is a followup fix for https://github.com/apache/spark/pull/47197. We 
found that the perf regression still exists after that fix and located the 
culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR 
fixes it by avoiding the immutable conversion, and then enforce the read lock 
protection during the accessing on `TaskMetrics#externalAccums` to avoid the 
race issue (https://github.com/apache/spark/pull/40663).
    
    ### Why are the changes needed?
    
    Fix perf regression.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Covered by existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47613 from Ngone51/SPARK-48791-followup-3.4.
    
    Authored-by: Yi Wu <yi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/util/ArrayImplicits.scala     | 35 ----------------------
 .../org/apache/spark/executor/TaskMetrics.scala    | 19 ++++++++----
 .../scala/org/apache/spark/scheduler/Task.scala    |  2 +-
 .../parquet/SpecificParquetRecordReaderBase.java   | 15 ++++++----
 .../sql/execution/ui/SQLAppStatusListener.scala    |  4 +--
 .../sql/execution/metric/SQLMetricsTestUtils.scala |  4 +--
 6 files changed, 28 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala 
b/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala
deleted file mode 100644
index 82c6c75bd51a..000000000000
--- a/core/src/main/scala-2.12/org/apache/spark/util/ArrayImplicits.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.compat.immutable
-
-/**
- * Implicit methods related to Scala Array.
- */
-private[spark] object ArrayImplicits {
-
-  implicit class SparkArrayOps[T](xs: Array[T]) {
-
-    /**
-     * Wraps an Array[T] as an immutable.ArraySeq[T] without copying.
-     */
-    def toImmutableArraySeq: immutable.ArraySeq[T] =
-      immutable.ArraySeq.unsafeWrapArray(xs)
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 468969b8d332..e88b70eb655c 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -29,8 +29,6 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.util._
-import org.apache.spark.util.ArrayImplicits._
-
 
 /**
  * :: DeveloperApi ::
@@ -272,8 +270,17 @@ class TaskMetrics private[spark] () extends Serializable {
    */
   @transient private[spark] lazy val _externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
-  private[spark] def externalAccums: Seq[AccumulatorV2[_, _]] = withReadLock {
-    _externalAccums.toArray.toImmutableArraySeq
+  /**
+   * Perform an `op` conversion on the `_externalAccums` within the read lock.
+   *
+   * Note `op` is expected to not modify the `_externalAccums` and not being
+   * lazy evaluation for safe concern since `ArrayBuffer` is lazily evaluated.
+   * And we intentionally keeps `_externalAccums` as mutable instead of 
converting
+   * it to immutable for the performance concern.
+   */
+  private[spark] def withExternalAccums[T](op: ArrayBuffer[AccumulatorV2[_, 
_]] => T)
+    : T = withReadLock {
+    op(_externalAccums)
   }
 
   private def withReadLock[B](fn: => B): B = {
@@ -298,7 +305,9 @@ class TaskMetrics private[spark] () extends Serializable {
     _externalAccums += a
   }
 
-  private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums 
++ externalAccums
+  private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = withReadLock {
+    internalAccums ++ _externalAccums
+  }
 
   private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = {
     // RESULT_SIZE accumulator is always zero at executor, we need to send it 
back as its
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 001e3220e73b..4f011cf9a3dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -208,7 +208,7 @@ private[spark] abstract class Task[T](
       context.taskMetrics.nonZeroInternalAccums() ++
         // zero value external accumulators may still be useful, e.g. 
SQLMetrics, we should not
         // filter them out.
-        context.taskMetrics.externalAccums.filter(a => !taskFailed || 
a.countFailedValues)
+        context.taskMetrics.withExternalAccums(_.filter(a => !taskFailed || 
a.countFailedValues))
     } else {
       Seq.empty
     }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 678b287a5e3a..4aa5ebceec11 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -129,12 +129,15 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     // in test case.
     TaskContext taskContext = TaskContext$.MODULE$.get();
     if (taskContext != null) {
-      Option<AccumulatorV2<?, ?>> accu = 
taskContext.taskMetrics().externalAccums().lastOption();
-      if (accu.isDefined() && 
accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
-        @SuppressWarnings("unchecked")
-        AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, 
Integer>) accu.get();
-        intAccum.add(fileReader.getRowGroups().size());
-      }
+      taskContext.taskMetrics().withExternalAccums((accums) -> {
+        Option<AccumulatorV2<?, ?>> accu = accums.lastOption();
+        if (accu.isDefined() && 
accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
+          @SuppressWarnings("unchecked")
+          AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, 
Integer>) accu.get();
+          intAccum.add(fileReader.getRowGroups().size());
+        }
+        return null;
+      });
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 7b9f877bdef5..cc92c6c08bf2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -178,14 +178,14 @@ class SQLAppStatusListener(
     // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
     // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
     val accums = if (live && event.taskMetrics != null) {
-      event.taskMetrics.externalAccums.flatMap { a =>
+      event.taskMetrics.withExternalAccums(_.flatMap { a =>
         // This call may fail if the accumulator is gc'ed, so account for that.
         try {
           Some(a.toInfo(Some(a.value), None))
         } catch {
           case _: IllegalAccessError => None
         }
-      }
+      })
     } else {
       info.accumulables
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 81667d52e16a..46dc84c0582f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -311,7 +311,7 @@ object InputOutputMetricsHelper {
       res.shuffleRecordsRead += 
taskEnd.taskMetrics.shuffleReadMetrics.recordsRead
 
       var maxOutputRows = 0L
-      for (accum <- taskEnd.taskMetrics.externalAccums) {
+      taskEnd.taskMetrics.withExternalAccums(_.foreach { accum =>
         val info = accum.toInfo(Some(accum.value), None)
         if (info.name.toString.contains("number of output rows")) {
           info.update match {
@@ -322,7 +322,7 @@ object InputOutputMetricsHelper {
             case _ => // Ignore.
           }
         }
-      }
+      })
       res.sumMaxOutputRows += maxOutputRows
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to