This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new c10160b4163 [SPARK-38640][CORE] Fix NPE with memory-only cache blocks and RDD fetching c10160b4163 is described below commit c10160b4163be00b8009cb462b1e33704b0ff3d6 Author: Adam Binford <adam...@gmail.com> AuthorDate: Sun Apr 17 08:39:27 2022 -0500 [SPARK-38640][CORE] Fix NPE with memory-only cache blocks and RDD fetching ### What changes were proposed in this pull request? Fixes a bug where if `spark.shuffle.service.fetch.rdd.enabled=true`, memory-only cached blocks will fail to unpersist. ### Why are the changes needed? In https://github.com/apache/spark/pull/33020, when all RDD blocks are removed from `externalShuffleServiceBlockStatus`, the underlying Map is nulled to reduce memory. When persisting blocks we check if it's using disk before adding it to `externalShuffleServiceBlockStatus`, but when removing them there is no check, so a memory-only cache block will keep `externalShuffleServiceBlockStatus` null, and when unpersisting it throw an NPE because it tries to remove from the null Map. This a [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and updated UT Closes #35959 from Kimahriman/fetch-rdd-memory-only-unpersist. Authored-by: Adam Binford <adam...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> (cherry picked from commit e0939f0f7c3d3bd4baa89e720038dbd3c7363a72) Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/storage/BlockManagerMasterEndpoint.scala | 8 +++++--- .../apache/spark/ExternalShuffleServiceSuite.scala | 22 ++++++++++++++++++++++ .../spark/storage/BlockManagerInfoSuite.scala | 2 ++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 4d8ba9b3e4e..adeb507941c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -838,9 +838,11 @@ private[spark] class BlockStatusPerBlockId { } def remove(blockId: BlockId): Unit = { - blocks.remove(blockId) - if (blocks.isEmpty) { - blocks = null + if (blocks != null) { + blocks.remove(blockId) + if (blocks.isEmpty) { + blocks = null + } } } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index dd3d90f3124..1ca78d572c7 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -255,4 +255,26 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi } } } + + test("SPARK-38640: memory only blocks can unpersist using shuffle service cache fetching") { + for (enabled <- Seq(true, false)) { + val confWithRddFetch = + conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, enabled) + sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetch) + sc.env.blockManager.externalShuffleServiceEnabled should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) + try { + val rdd = sc.parallelize(0 until 100, 2) + .map { i => (i, 1) } + .persist(StorageLevel.MEMORY_ONLY) + + rdd.count() + rdd.unpersist(true) + assert(sc.persistentRdds.isEmpty) + } finally { + rpcHandler.applicationRemoved(sc.conf.getAppId, true) + sc.stop() + } + } + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala index f0c19c5ccce..85f012aece3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala @@ -63,6 +63,8 @@ class BlockManagerInfoSuite extends SparkFunSuite { if (svcEnabled) { assert(getEssBlockStatus(bmInfo, rddId).isEmpty) } + bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 0) + assert(bmInfo.remainingMem === 30000) } testWithShuffleServiceOnOff("RDD block with MEMORY_AND_DISK") { (svcEnabled, bmInfo) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org