This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new b2ff4c4f7ec [SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums b2ff4c4f7ec is described below commit b2ff4c4f7ec21d41cb173b413bd5aa5feefd7eee Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com> AuthorDate: Fri Apr 7 10:14:07 2023 +0900 [SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums ### What changes were proposed in this pull request? This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change https://github.com/scala/scala/pull/9258~ (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be https://github.com/scala/scala/pull/9786) leads to an uncaught exception in the `executor-hea [...] This fix of using of using `CopyOnWriteArrayList` is cherry picked from https://github.com/apache/spark/pull/37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here https://github.com/apache/spark/pull/37206#issuecomment-1189930626 saying that there should be no such race base [...] ### Why are the changes needed? The current code has a data race. ### Does this PR introduce _any_ user-facing change? It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13. ### How was this patch tested? This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13. Closes #40663 from eejbyfeldt/SPARK-39696. Lead-authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com> Co-authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 6ce0822f76e11447487d5f6b3cce94a894f2ceef) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/executor/TaskMetrics.scala | 10 +++++++--- .../org/apache/spark/executor/ExecutorSuite.scala | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1ca8590b1c9..78b39b0cbda 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import java.util.concurrent.CopyOnWriteArrayList + import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} @@ -262,10 +264,12 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] + @transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]] + + private[spark] def externalAccums = _externalAccums.asScala private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { - externalAccums += a + _externalAccums.add(a) } private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums @@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging { tmAcc.metadata = acc.metadata tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) } else { - tm.externalAccums += acc + tm._externalAccums.add(acc) } } tm diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index bef36d08e8a..46f41195ebd 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map} import scala.concurrent.duration._ import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.logging.log4j._ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} @@ -270,6 +271,27 @@ class ExecutorSuite extends SparkFunSuite heartbeatZeroAccumulatorUpdateTest(false) } + test("SPARK-39696: Using accumulators should not cause heartbeat to fail") { + val conf = new SparkConf().setMaster("local").setAppName("executor suite test") + conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms") + sc = new SparkContext(conf) + + val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator$i")) + val input = sc.parallelize(1 to 10, 10) + var testRdd = input.map(i => (i, i)) + (0 to 10).foreach( i => + testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }).reduceByKey(_ + _) + ) + + val logAppender = new LogAppender("heartbeat thread should not die") + withLogAppender(logAppender, level = Some(Level.ERROR)) { + val _ = testRdd.count() + } + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + .filter(_.contains("Uncaught exception in thread executor-heartbeater")) + assert(logs.isEmpty) + } + private def withMockHeartbeatReceiverRef(executor: Executor) (func: RpcEndpointRef => Unit): Unit = { val executorClass = classOf[Executor] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org