This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new b2ff4c4f7ec [SPARK-39696][CORE] Fix data race in access to 
TaskMetrics.externalAccums
b2ff4c4f7ec is described below

commit b2ff4c4f7ec21d41cb173b413bd5aa5feefd7eee
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Fri Apr 7 10:14:07 2023 +0900

    [SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums
    
    ### What changes were proposed in this pull request?
    This PR fixes a data race around concurrent access to 
`TaskMetrics.externalAccums`. The race occurs between the 
`executor-heartbeater` thread and the thread executing the task. This data race 
is not known to cause issues on 2.12 but in 2.13 ~due this change 
https://github.com/scala/scala/pull/9258~ (LuciferYang bisected this to first 
cause failures in scala 2.13.7 one possible reason could be 
https://github.com/scala/scala/pull/9786) leads to an uncaught exception in the 
`executor-hea [...]
    
    This fix of using of using `CopyOnWriteArrayList` is cherry picked from 
https://github.com/apache/spark/pull/37206 where is was suggested as a fix by 
LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside 
the class `TaskMetrics`. The old PR was closed because at that point there was 
no clear understanding of the race condition. JoshRosen commented here 
https://github.com/apache/spark/pull/37206#issuecomment-1189930626 saying that 
there should be no such race base [...]
    
    ### Why are the changes needed?
    The current code has a data race.
    
    ### Does this PR introduce _any_ user-facing change?
    It will fix an uncaught exception in the `executor-hearbeater` thread when 
using scala 2.13.
    
    ### How was this patch tested?
    This patch adds a new test case, that before the fix was applied 
consistently produces the uncaught exception in the heartbeater thread when 
using scala 2.13.
    
    Closes #40663 from eejbyfeldt/SPARK-39696.
    
    Lead-authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Co-authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 6ce0822f76e11447487d5f6b3cce94a894f2ceef)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/executor/TaskMetrics.scala    | 10 +++++++---
 .../org/apache/spark/executor/ExecutorSuite.scala  | 22 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 1ca8590b1c9..78b39b0cbda 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.executor
 
+import java.util.concurrent.CopyOnWriteArrayList
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
 
@@ -262,10 +264,12 @@ class TaskMetrics private[spark] () extends Serializable {
   /**
    * External accumulators registered with this task.
    */
-  @transient private[spark] lazy val externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
+  @transient private[spark] lazy val _externalAccums = new 
CopyOnWriteArrayList[AccumulatorV2[_, _]]
+
+  private[spark] def externalAccums = _externalAccums.asScala
 
   private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
-    externalAccums += a
+    _externalAccums.add(a)
   }
 
   private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums 
++ externalAccums
@@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging {
         tmAcc.metadata = acc.metadata
         tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
       } else {
-        tm.externalAccums += acc
+        tm._externalAccums.add(acc)
       }
     }
     tm
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index bef36d08e8a..46f41195ebd 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map}
 import scala.concurrent.duration._
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.logging.log4j._
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, eq => meq}
 import org.mockito.Mockito.{inOrder, verify, when}
@@ -270,6 +271,27 @@ class ExecutorSuite extends SparkFunSuite
     heartbeatZeroAccumulatorUpdateTest(false)
   }
 
+  test("SPARK-39696: Using accumulators should not cause heartbeat to fail") {
+    val conf = new SparkConf().setMaster("local").setAppName("executor suite 
test")
+    conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms")
+    sc = new SparkContext(conf)
+
+    val accums = (1 to 10).map(i => 
sc.longAccumulator(s"mapperRunAccumulator$i"))
+    val input = sc.parallelize(1 to 10, 10)
+    var testRdd = input.map(i => (i, i))
+    (0 to 10).foreach( i =>
+      testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) 
}).reduceByKey(_ + _)
+    )
+
+    val logAppender = new LogAppender("heartbeat thread should not die")
+    withLogAppender(logAppender, level = Some(Level.ERROR)) {
+      val _ = testRdd.count()
+    }
+    val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+      .filter(_.contains("Uncaught exception in thread executor-heartbeater"))
+    assert(logs.isEmpty)
+  }
+
   private def withMockHeartbeatReceiverRef(executor: Executor)
       (func: RpcEndpointRef => Unit): Unit = {
     val executorClass = classOf[Executor]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to