This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 1387af7847c [SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE when using Scala 2.13 1387af7847c is described below commit 1387af7847cb4e40b82ac978b991bf5ffedf6ed4 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Jun 29 18:09:10 2022 -0500 [SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE when using Scala 2.13 This pr add a `shuffleStatus != null` condition to `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method to avoid throwing NPE when using Scala 2.13. Ensure that no NPE is thrown when `o.a.s.MapOutputTrackerMaster#unregisterShuffle` is called by multiple threads, this pr is only for Scala 2.13. `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method will be called concurrently by the following two paths: - BlockManagerStorageEndpoint: https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L56-L62 - ContextCleaner: https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L234-L241 When test with Scala 2.13, for example `sql/core` module, there are many log as follows,although these did not cause UTs failure: ``` 17:44:09.957 WARN org.apache.spark.storage.BlockManagerMaster: Failed to remove shuffle 87 - null java.lang.NullPointerException at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882) at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881) at scala.Option.foreach(Option.scala:437) at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881) at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:59) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 17:44:09.958 ERROR org.apache.spark.ContextCleaner: Error cleaning shuffle 94 java.lang.NullPointerException at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882) at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881) at scala.Option.foreach(Option.scala:437) at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881) at org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:202) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195) at scala.Option.foreach(Option.scala:437) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1432) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79) ``` I think this is a bug of Scala 2.13.8 and already submit an issue to https://github.com/scala/bug/issues/12613, this PR is only for protection, we should remove this protection after Scala 2.13(maybe https://github.com/scala/scala/pull/9957) fixes this issue. No - Pass GA - Add new test `SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE` to `MapOutputTrackerSuite`, we can test manually as follows: ``` dev/change-scala-version.sh 2.13 mvn clean install -DskipTests -pl core -am -Pscala-2.13 mvn clean test -pl core -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.MapOutputTrackerSuite ``` **Before** ``` - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE *** FAILED *** 3 did not equal 0 (MapOutputTrackerSuite.scala:971) Run completed in 17 seconds, 505 milliseconds. Total number of tests run: 25 Suites: completed 2, aborted 0 Tests: succeeded 24, failed 1, canceled 0, ignored 1, pending 0 *** 1 TEST FAILED *** ``` **After** ``` - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE Run completed in 17 seconds, 996 milliseconds. Total number of tests run: 25 Suites: completed 2, aborted 0 Tests: succeeded 25, failed 0, canceled 0, ignored 1, pending 0 All tests passed. ``` Closes #37024 from LuciferYang/SPARK-39553. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> (cherry picked from commit 29258964cae45cea43617ade971fb4ea9fe2902a) Signed-off-by: Sean Owen <sro...@gmail.com> --- .../scala/org/apache/spark/MapOutputTracker.scala | 8 +++-- .../org/apache/spark/MapOutputTrackerSuite.scala | 34 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index af26abc0989..e469c9989f2 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -832,8 +832,12 @@ private[spark] class MapOutputTrackerMaster( /** Unregister shuffle data */ def unregisterShuffle(shuffleId: Int): Unit = { shuffleStatuses.remove(shuffleId).foreach { shuffleStatus => - shuffleStatus.invalidateSerializedMapOutputStatusCache() - shuffleStatus.invalidateSerializedMergeOutputStatusCache() + // SPARK-39553: Add protection for Scala 2.13 due to https://github.com/scala/bug/issues/12613 + // We should revert this if Scala 2.13 solves this issue. + if (shuffleStatus != null) { + shuffleStatus.invalidateSerializedMapOutputStatusCache() + shuffleStatus.invalidateSerializedMergeOutputStatusCache() + } } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 0ee2c779979..980d0835661 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.util.concurrent.atomic.LongAdder + import scala.collection.mutable.ArrayBuffer import org.mockito.ArgumentMatchers.any @@ -910,4 +912,36 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() slaveRpcEnv.shutdown() } + + test("SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + val shuffleIdRange = 0 until 100 + shuffleIdRange.foreach { shuffleId => + tracker.registerShuffle(shuffleId, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) + } + val npeCounter = new LongAdder() + // More threads will help to reproduce the problem + val threads = new Array[Thread](5) + threads.indices.foreach { i => + threads(i) = new Thread() { + override def run(): Unit = { + shuffleIdRange.foreach { shuffleId => + try { + tracker.unregisterShuffle(shuffleId) + } catch { + case _: NullPointerException => npeCounter.increment() + } + } + } + } + } + threads.foreach(_.start()) + threads.foreach(_.join()) + tracker.stop() + rpcEnv.shutdown() + assert(npeCounter.intValue() == 0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org