This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1387af7847c [SPARK-39553][CORE] Multi-thread unregister shuffle 
shouldn't throw NPE when using Scala 2.13
1387af7847c is described below

commit 1387af7847cb4e40b82ac978b991bf5ffedf6ed4
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed Jun 29 18:09:10 2022 -0500

    [SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE 
when using Scala 2.13
    
    This pr add a `shuffleStatus != null` condition to 
`o.a.s.MapOutputTrackerMaster#unregisterShuffle` method to avoid throwing NPE 
when using Scala 2.13.
    
    Ensure that no NPE is thrown when 
`o.a.s.MapOutputTrackerMaster#unregisterShuffle` is called by multiple threads, 
this pr is only for Scala 2.13.
    
    `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method will be called 
concurrently by the following two paths:
    
    - BlockManagerStorageEndpoint:
    
    
https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L56-L62
    
    - ContextCleaner:
    
    
https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L234-L241
    
    When test with Scala 2.13, for example `sql/core` module,  there are many 
log as follows,although these did not cause UTs failure:
    
    ```
    17:44:09.957 WARN org.apache.spark.storage.BlockManagerMaster: Failed to 
remove shuffle 87 - null
    java.lang.NullPointerException
            at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
            at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
            at scala.Option.foreach(Option.scala:437)
            at 
org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
            at 
org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:59)
            at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17)
            at 
org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
            at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678)
            at 
scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    17:44:09.958 ERROR org.apache.spark.ContextCleaner: Error cleaning shuffle 
94
    java.lang.NullPointerException
            at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
            at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
            at scala.Option.foreach(Option.scala:437)
            at 
org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
            at 
org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241)
            at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:202)
            at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
            at scala.Option.foreach(Option.scala:437)
            at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
            at 
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1432)
            at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
            at 
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
    ```
    
    I think this is a bug of Scala 2.13.8 and already submit an issue to 
https://github.com/scala/bug/issues/12613, this PR is only for protection, we 
should remove this protection after Scala 2.13(maybe 
https://github.com/scala/scala/pull/9957) fixes this issue.
    
    No
    
    - Pass GA
    - Add new test `SPARK-39553: Multi-thread unregister shuffle shouldn't 
throw NPE` to `MapOutputTrackerSuite`, we can test manually as follows:
    
    ```
    dev/change-scala-version.sh 2.13
    mvn clean install -DskipTests -pl core -am -Pscala-2.13
    mvn clean test -pl core -Pscala-2.13 -Dtest=none 
-DwildcardSuites=org.apache.spark.MapOutputTrackerSuite
    ```
    
    **Before**
    
    ```
    - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE *** 
FAILED ***
      3 did not equal 0 (MapOutputTrackerSuite.scala:971)
    Run completed in 17 seconds, 505 milliseconds.
    Total number of tests run: 25
    Suites: completed 2, aborted 0
    Tests: succeeded 24, failed 1, canceled 0, ignored 1, pending 0
    *** 1 TEST FAILED ***
    ```
    
    **After**
    
    ```
    - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE
    Run completed in 17 seconds, 996 milliseconds.
    Total number of tests run: 25
    Suites: completed 2, aborted 0
    Tests: succeeded 25, failed 0, canceled 0, ignored 1, pending 0
    All tests passed.
    
    ```
    
    Closes #37024 from LuciferYang/SPARK-39553.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit 29258964cae45cea43617ade971fb4ea9fe2902a)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala  |  8 +++--
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 34 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index af26abc0989..e469c9989f2 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -832,8 +832,12 @@ private[spark] class MapOutputTrackerMaster(
   /** Unregister shuffle data */
   def unregisterShuffle(shuffleId: Int): Unit = {
     shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
-      shuffleStatus.invalidateSerializedMapOutputStatusCache()
-      shuffleStatus.invalidateSerializedMergeOutputStatusCache()
+      // SPARK-39553: Add protection for Scala 2.13 due to 
https://github.com/scala/bug/issues/12613
+      // We should revert this if Scala 2.13 solves this issue.
+      if (shuffleStatus != null) {
+        shuffleStatus.invalidateSerializedMapOutputStatusCache()
+        shuffleStatus.invalidateSerializedMergeOutputStatusCache()
+      }
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 0ee2c779979..980d0835661 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import java.util.concurrent.atomic.LongAdder
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.mockito.ArgumentMatchers.any
@@ -910,4 +912,36 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
     rpcEnv.shutdown()
     slaveRpcEnv.shutdown()
   }
+
+  test("SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE") {
+    val rpcEnv = createRpcEnv("test")
+    val tracker = newTrackerMaster()
+    tracker.trackerEndpoint = 
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+    val shuffleIdRange = 0 until 100
+    shuffleIdRange.foreach { shuffleId =>
+      tracker.registerShuffle(shuffleId, 2, 
MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
+    }
+    val npeCounter = new LongAdder()
+    // More threads will help to reproduce the problem
+    val threads = new Array[Thread](5)
+    threads.indices.foreach { i =>
+      threads(i) = new Thread() {
+        override def run(): Unit = {
+          shuffleIdRange.foreach { shuffleId =>
+            try {
+              tracker.unregisterShuffle(shuffleId)
+            } catch {
+              case _: NullPointerException => npeCounter.increment()
+            }
+          }
+        }
+      }
+    }
+    threads.foreach(_.start())
+    threads.foreach(_.join())
+    tracker.stop()
+    rpcEnv.shutdown()
+    assert(npeCounter.intValue() == 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to