This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c10160b4163 [SPARK-38640][CORE] Fix NPE with memory-only cache blocks 
and RDD fetching
c10160b4163 is described below

commit c10160b4163be00b8009cb462b1e33704b0ff3d6
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Sun Apr 17 08:39:27 2022 -0500

    [SPARK-38640][CORE] Fix NPE with memory-only cache blocks and RDD fetching
    
    ### What changes were proposed in this pull request?
    
    Fixes a bug where if `spark.shuffle.service.fetch.rdd.enabled=true`, 
memory-only cached blocks will fail to unpersist.
    
    ### Why are the changes needed?
    
    In https://github.com/apache/spark/pull/33020, when all RDD blocks are 
removed from `externalShuffleServiceBlockStatus`, the underlying Map is nulled 
to reduce memory. When persisting blocks we check if it's using disk before 
adding it to `externalShuffleServiceBlockStatus`, but when removing them there 
is no check, so a memory-only cache block will keep 
`externalShuffleServiceBlockStatus` null, and when unpersisting it throw an NPE 
because it tries to remove from the null Map. This a [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New and updated UT
    
    Closes #35959 from Kimahriman/fetch-rdd-memory-only-unpersist.
    
    Authored-by: Adam Binford <adam...@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit e0939f0f7c3d3bd4baa89e720038dbd3c7363a72)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/storage/BlockManagerMasterEndpoint.scala |  8 +++++---
 .../apache/spark/ExternalShuffleServiceSuite.scala | 22 ++++++++++++++++++++++
 .../spark/storage/BlockManagerInfoSuite.scala      |  2 ++
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4d8ba9b3e4e..adeb507941c 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -838,9 +838,11 @@ private[spark] class BlockStatusPerBlockId {
   }
 
   def remove(blockId: BlockId): Unit = {
-    blocks.remove(blockId)
-    if (blocks.isEmpty) {
-      blocks = null
+    if (blocks != null) {
+      blocks.remove(blockId)
+      if (blocks.isEmpty) {
+        blocks = null
+      }
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index dd3d90f3124..1ca78d572c7 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -255,4 +255,26 @@ class ExternalShuffleServiceSuite extends ShuffleSuite 
with BeforeAndAfterAll wi
       }
     }
   }
+
+  test("SPARK-38640: memory only blocks can unpersist using shuffle service 
cache fetching") {
+    for (enabled <- Seq(true, false)) {
+      val confWithRddFetch =
+        conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, enabled)
+      sc = new SparkContext("local-cluster[1,1,1024]", "test", 
confWithRddFetch)
+      sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+      sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[ExternalBlockStoreClient])
+      try {
+        val rdd = sc.parallelize(0 until 100, 2)
+          .map { i => (i, 1) }
+          .persist(StorageLevel.MEMORY_ONLY)
+
+        rdd.count()
+        rdd.unpersist(true)
+        assert(sc.persistentRdds.isEmpty)
+      } finally {
+        rpcHandler.applicationRemoved(sc.conf.getAppId, true)
+        sc.stop()
+      }
+    }
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
index f0c19c5ccce..85f012aece3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
@@ -63,6 +63,8 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     if (svcEnabled) {
       assert(getEssBlockStatus(bmInfo, rddId).isEmpty)
     }
+    bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 0)
+    assert(bmInfo.remainingMem === 30000)
   }
 
   testWithShuffleServiceOnOff("RDD block with MEMORY_AND_DISK") { (svcEnabled, 
bmInfo) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to