This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bdece0  [SPARK-35543][CORE][FOLLOWUP] Fix memory leak in 
BlockManagerMasterEndpoint removeRdd
0bdece0 is described below

commit 0bdece015e20fc305c8b206b4a793902024803af
Author: attilapiros <piros.attila.zs...@gmail.com>
AuthorDate: Thu Jun 24 00:01:40 2021 -0500

    [SPARK-35543][CORE][FOLLOWUP] Fix memory leak in BlockManagerMasterEndpoint 
removeRdd
    
    ### What changes were proposed in this pull request?
    
    Wrapping `JHashMap[BlockId, BlockStatus]` (used in 
`blockStatusByShuffleService`) into a new class `BlockStatusPerBlockId` which 
removes the reference to the map when all the persisted blocks are removed.
    
    ### Why are the changes needed?
    
    With https://github.com/apache/spark/pull/32790 a bug is introduced when 
all the persisted blocks are removed we remove the HashMap which already shared 
by the block manger infos but when new block is persisted this map is needed to 
be used again for storing the data (and this HashMap must be the same which 
shared by the block manger infos created for registered block managers running 
on the same host where the external shuffle service is).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Extending `BlockManagerInfoSuite` with test which removes all the persisted 
blocks then adds another one.
    
    Closes #33020 from attilapiros/SPARK-35543-2.
    
    Authored-by: attilapiros <piros.attila.zs...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../spark/storage/BlockManagerMasterEndpoint.scala | 46 +++++++++++++-----
 .../spark/storage/BlockManagerInfoSuite.scala      | 55 +++++++++++++---------
 2 files changed, 67 insertions(+), 34 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 80dd1cb..b48e73e 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -63,7 +63,7 @@ class BlockManagerMasterEndpoint(
 
   // Mapping from external shuffle service block manager id to the block 
statuses.
   private val blockStatusByShuffleService =
-    new mutable.HashMap[BlockManagerId, JHashMap[BlockId, BlockStatus]]
+    new mutable.HashMap[BlockManagerId, BlockStatusPerBlockId]
 
   // Mapping from executor ID to block manager ID.
   private val blockManagerIdByExecutor = new mutable.HashMap[String, 
BlockManagerId]
@@ -278,11 +278,6 @@ class BlockManagerMasterEndpoint(
           blockIdsToDel += blockId
           blockStatusByShuffleService.get(bmIdForShuffleService).foreach { 
blockStatusForId =>
             blockStatusForId.remove(blockId)
-            // when all blocks are removed from the block statuses then for 
this BM Id the whole
-            // blockStatusByShuffleService entry can be removed to avoid 
leaking memory
-            if (blockStatusForId.isEmpty) {
-              blockStatusByShuffleService.remove(bmIdForShuffleService)
-            }
           }
         }
       }
@@ -569,8 +564,12 @@ class BlockManagerMasterEndpoint(
 
       val externalShuffleServiceBlockStatus =
         if (externalShuffleServiceRddFetchEnabled) {
+          // The blockStatusByShuffleService entries are never removed as they 
belong to the
+          // external shuffle service instances running on the cluster nodes. 
To decrease its
+          // memory footprint when all the disk persisted blocks are removed 
for a shuffle service
+          // BlockStatusPerBlockId releases the backing HashMap.
           val externalShuffleServiceBlocks = blockStatusByShuffleService
-            .getOrElseUpdate(externalShuffleServiceIdOnHost(id), new 
JHashMap[BlockId, BlockStatus])
+            .getOrElseUpdate(externalShuffleServiceIdOnHost(id), new 
BlockStatusPerBlockId)
           Some(externalShuffleServiceBlocks)
         } else {
           None
@@ -671,7 +670,7 @@ class BlockManagerMasterEndpoint(
     val locations = 
Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
     val status = locations.headOption.flatMap { bmId =>
       if (externalShuffleServiceRddFetchEnabled && bmId.port == 
externalShuffleServicePort) {
-        blockStatusByShuffleService.get(bmId).flatMap(m => 
Option(m.get(blockId)))
+        blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
       } else {
         aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId))
       }
@@ -794,19 +793,44 @@ object BlockStatus {
   def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, 
diskSize = 0L)
 }
 
+/**
+ * Stores block statuses for block IDs but removes the reference to the Map 
which used for storing
+ * the data when all the blocks are removed to avoid keeping the memory when 
not needed.
+ */
+private[spark] class BlockStatusPerBlockId {
+
+  private var blocks: JHashMap[BlockId, BlockStatus] = _
+
+  def get(blockId: BlockId): Option[BlockStatus] =
+    if (blocks == null) None else Option(blocks.get(blockId))
+
+  def put(blockId: BlockId, blockStatus: BlockStatus): Unit = {
+    if (blocks == null) {
+      blocks = new JHashMap[BlockId, BlockStatus]
+    }
+    blocks.put(blockId, blockStatus)
+  }
+
+  def remove(blockId: BlockId): Unit = {
+    blocks.remove(blockId)
+    if (blocks.isEmpty) {
+      blocks = null
+    }
+  }
+
+}
+
 private[spark] class BlockManagerInfo(
     val blockManagerId: BlockManagerId,
     timeMs: Long,
     val maxOnHeapMem: Long,
     val maxOffHeapMem: Long,
     val storageEndpoint: RpcEndpointRef,
-    val externalShuffleServiceBlockStatus: Option[JHashMap[BlockId, 
BlockStatus]])
+    val externalShuffleServiceBlockStatus: Option[BlockStatusPerBlockId])
   extends Logging {
 
   val maxMem = maxOnHeapMem + maxOffHeapMem
 
-  val externalShuffleServiceEnabled = 
externalShuffleServiceBlockStatus.isDefined
-
   private var _lastSeenMs: Long = timeMs
   private var _remainingMem: Long = maxMem
   private var _executorRemovalTs: Option[Long] = None
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
index 3f5ffaa..f0c19c5 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.storage
 
-import java.util.{HashMap => JHashMap}
-
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkFunSuite
 
 class BlockManagerInfoSuite extends SparkFunSuite {
 
-  def testWithShuffleServiceOnOff(testName: String)
+  private def testWithShuffleServiceOnOff(testName: String)
       (f: (Boolean, BlockManagerInfo) => Unit): Unit = {
     Seq(true, false).foreach { svcEnabled =>
       val bmInfo = new BlockManagerInfo(
@@ -34,13 +32,19 @@ class BlockManagerInfoSuite extends SparkFunSuite {
         maxOnHeapMem = 10000,
         maxOffHeapMem = 20000,
         storageEndpoint = null,
-        if (svcEnabled) Some(new JHashMap[BlockId, BlockStatus]) else None)
+        if (svcEnabled) Some(new BlockStatusPerBlockId) else None)
       test(s"$testName externalShuffleServiceEnabled=$svcEnabled") {
         f(svcEnabled, bmInfo)
       }
     }
   }
 
+  private def getEssBlockStatus(bmInfo: BlockManagerInfo, blockId: BlockId): 
Option[BlockStatus] = {
+    assert(bmInfo.externalShuffleServiceBlockStatus.isDefined)
+    val blockStatusPerBlockId = bmInfo.externalShuffleServiceBlockStatus.get
+    blockStatusPerBlockId.get(blockId)
+  }
+
   testWithShuffleServiceOnOff("broadcast block") { (_, bmInfo) =>
     val broadcastId: BlockId = BroadcastBlockId(0, "field1")
     bmInfo.updateBlockInfo(
@@ -57,7 +61,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
       Map(rddId -> BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
     assert(bmInfo.remainingMem === 29800)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
+      assert(getEssBlockStatus(bmInfo, rddId).isEmpty)
     }
   }
 
@@ -70,8 +74,8 @@ class BlockManagerInfoSuite extends SparkFunSuite {
       Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
     assert(bmInfo.remainingMem === 29800)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
-        Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
+      assert(getEssBlockStatus(bmInfo, rddId) ===
+        Some(BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
     }
   }
 
@@ -83,8 +87,7 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else 
Set(rddId)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
-        Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
+      assert(getEssBlockStatus(bmInfo, rddId) === 
Some(BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     }
   }
 
@@ -96,15 +99,14 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     assert(bmInfo.blocks.asScala  === Map(rddId -> 
BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
     assert(bmInfo.remainingMem === 29800)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
+      assert(getEssBlockStatus(bmInfo, rddId).isEmpty)
     }
 
     bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
     assert(bmInfo.blocks.asScala === Map(rddId -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
-        Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
+      assert(getEssBlockStatus(bmInfo, rddId) === 
Some(BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     }
   }
 
@@ -114,33 +116,40 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     assert(bmInfo.blocks.asScala === Map(rddId -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
-        Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
+      assert(getEssBlockStatus(bmInfo, rddId) === 
Some(BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     }
 
     bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 
200)
     assert(bmInfo.blocks.isEmpty)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
+      assert(getEssBlockStatus(bmInfo, rddId).isEmpty)
     }
   }
 
-  testWithShuffleServiceOnOff("remove block") { (svcEnabled, bmInfo) =>
-    val rddId: BlockId = RDDBlockId(0, 0)
-    bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
-    assert(bmInfo.blocks.asScala === Map(rddId -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
+  testWithShuffleServiceOnOff("remove block and add another one") { 
(svcEnabled, bmInfo) =>
+    val rddId1: BlockId = RDDBlockId(0, 0)
+    bmInfo.updateBlockInfo(rddId1, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
+    assert(bmInfo.blocks.asScala === Map(rddId1 -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
-        Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
+      assert(getEssBlockStatus(bmInfo, rddId1) ===
+        Some(BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     }
 
-    bmInfo.removeBlock(rddId)
+    bmInfo.removeBlock(rddId1)
     assert(bmInfo.blocks.asScala.isEmpty)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
-      assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
+      assert(getEssBlockStatus(bmInfo, rddId1).isEmpty)
+    }
+    val rddId2: BlockId = RDDBlockId(0, 1)
+    bmInfo.updateBlockInfo(rddId2, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
+    assert(bmInfo.blocks.asScala === Map(rddId2 -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
+    assert(bmInfo.remainingMem === 30000)
+    if (svcEnabled) {
+      assert(getEssBlockStatus(bmInfo, rddId2) ===
+        Some(BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to