This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 3173439  [SPARK-29055][CORE] Update driver/executors' storage memory 
when block is removed from BlockManager
3173439 is described below

commit 31734399d57f3c128e66b0f97ef83eb4c9165978
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Tue Oct 1 09:41:51 2019 -0700

    [SPARK-29055][CORE] Update driver/executors' storage memory when block is 
removed from BlockManager
    
    This patch proposes to fix the issue that storage memory is not decreasing 
even block is removed in BlockManager. Originally the issue is found while 
removed broadcast doesn't reflect the storage memory on driver/executors.
    
    AppStatusListener expects the value of memory in events on block update as 
"delta" so that it adjusts driver/executors' storage memory based on delta, but 
when removing block BlockManager reports the delta as 0, so the storage memory 
is not decreased. `BlockManager.dropFromMemory` deals with this correctly, so 
some of path of freeing memory has been updated correctly.
    
    The storage memory in metrics in AppStatusListener is now out of sync which 
lets end users easy to confuse as memory leak is happening.
    
    No.
    
    Modified UTs. Also manually tested via running simple query repeatedly and 
observe executor page of Spark UI to see the value of storage memory is 
decreasing as well.
    
    Please refer the description of 
[SPARK-29055](https://issues.apache.org/jira/browse/SPARK-29055) to get simple 
reproducer.
    
    Closes #25973 from HeartSaVioR/SPARK-29055.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
    (cherry picked from commit a4601cb44e3709699f616f83d486351a1f459f8c)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 10 +++-
 .../apache/spark/storage/BlockManagerSuite.scala   | 57 ++++++++++++++++++++--
 2 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e35dd72..9e1693a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1583,15 +1583,23 @@ private[spark] class BlockManager(
    * lock on the block.
    */
   private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit 
= {
+    val blockStatus = if (tellMaster) {
+      val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
+      Some(getCurrentBlockStatus(blockId, blockInfo))
+    } else None
+
     // Removals are idempotent in disk store and memory store. At worst, we 
get a warning.
     val removedFromMemory = memoryStore.remove(blockId)
     val removedFromDisk = diskStore.remove(blockId)
     if (!removedFromMemory && !removedFromDisk) {
       logWarning(s"Block $blockId could not be removed as it was not found on 
disk or in memory")
     }
+
     blockInfoManager.removeBlock(blockId)
     if (tellMaster) {
-      reportBlockStatus(blockId, BlockStatus.empty)
+      // Only update storage level from the captured block status before 
deleting, so that
+      // memory size and disk size are being kept for calculating delta.
+      reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = 
StorageLevel.NONE))
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index abde4df..b44cef5 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -27,8 +27,8 @@ import scala.language.{implicitConversions, postfixOps}
 import scala.reflect.ClassTag
 
 import org.apache.commons.lang3.RandomUtils
-import org.mockito.{Matchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.{ArgumentCaptor, Matchers => mc}
+import org.mockito.Mockito.{mock, never, spy, times, verify, when}
 import org.scalatest._
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
@@ -128,9 +128,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     // need to create a SparkContext is to initialize LiveListenerBus.
     sc = mock(classOf[SparkContext])
     when(sc.conf).thenReturn(conf)
-    master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
+    master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(conf))), conf, true)
+        new LiveListenerBus(conf))), conf, true))
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
@@ -275,14 +275,19 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       assert(!store.hasLocalBlock("a1-to-remove"))
       master.getLocations("a1-to-remove") should have size 0
+      assertUpdateBlockInfoReportedForRemovingBlock(store, "a1-to-remove",
+        removedFromMemory = true, removedFromDisk = false)
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       assert(!store.hasLocalBlock("a2-to-remove"))
       master.getLocations("a2-to-remove") should have size 0
+      assertUpdateBlockInfoReportedForRemovingBlock(store, "a2-to-remove",
+        removedFromMemory = true, removedFromDisk = false)
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       assert(store.hasLocalBlock("a3-to-remove"))
       master.getLocations("a3-to-remove") should have size 0
+      assertUpdateBlockInfoNotReported(store, "a3-to-remove")
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       val memStatus = master.getMemoryStatus.head._2
@@ -361,16 +366,21 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(!executorStore.hasLocalBlock(broadcast0BlockId))
     assert(executorStore.hasLocalBlock(broadcast1BlockId))
     assert(executorStore.hasLocalBlock(broadcast2BlockId))
+    assertUpdateBlockInfoReportedForRemovingBlock(executorStore, 
broadcast0BlockId,
+      removedFromMemory = false, removedFromDisk = true)
 
     // nothing should be removed from the driver store
     assert(driverStore.hasLocalBlock(broadcast0BlockId))
     assert(driverStore.hasLocalBlock(broadcast1BlockId))
     assert(driverStore.hasLocalBlock(broadcast2BlockId))
+    assertUpdateBlockInfoNotReported(driverStore, broadcast0BlockId)
 
     // remove broadcast 0 block from the driver as well
     master.removeBroadcast(0, removeFromMaster = true, blocking = true)
     assert(!driverStore.hasLocalBlock(broadcast0BlockId))
     assert(driverStore.hasLocalBlock(broadcast1BlockId))
+    assertUpdateBlockInfoReportedForRemovingBlock(driverStore, 
broadcast0BlockId,
+      removedFromMemory = false, removedFromDisk = true)
 
     // remove broadcast 1 block from both the stores asynchronously
     // and verify all broadcast 1 blocks have been removed
@@ -378,6 +388,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       assert(!driverStore.hasLocalBlock(broadcast1BlockId))
       assert(!executorStore.hasLocalBlock(broadcast1BlockId))
+      assertUpdateBlockInfoReportedForRemovingBlock(driverStore, 
broadcast1BlockId,
+        removedFromMemory = false, removedFromDisk = true)
+      assertUpdateBlockInfoReportedForRemovingBlock(executorStore, 
broadcast1BlockId,
+        removedFromMemory = false, removedFromDisk = true)
     }
 
     // remove broadcast 2 from both the stores asynchronously
@@ -388,11 +402,46 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
       assert(!executorStore.hasLocalBlock(broadcast2BlockId))
       assert(!executorStore.hasLocalBlock(broadcast2BlockId2))
+      assertUpdateBlockInfoReportedForRemovingBlock(driverStore, 
broadcast2BlockId,
+        removedFromMemory = false, removedFromDisk = true)
+      assertUpdateBlockInfoReportedForRemovingBlock(driverStore, 
broadcast2BlockId2,
+        removedFromMemory = false, removedFromDisk = true)
+      assertUpdateBlockInfoReportedForRemovingBlock(executorStore, 
broadcast2BlockId,
+        removedFromMemory = false, removedFromDisk = true)
+      assertUpdateBlockInfoReportedForRemovingBlock(executorStore, 
broadcast2BlockId2,
+        removedFromMemory = false, removedFromDisk = true)
     }
     executorStore.stop()
     driverStore.stop()
   }
 
+  private def assertUpdateBlockInfoReportedForRemovingBlock(
+      store: BlockManager,
+      blockId: BlockId,
+      removedFromMemory: Boolean,
+      removedFromDisk: Boolean): Unit = {
+    def assertSizeReported(captor: ArgumentCaptor[Long], expectRemoved: 
Boolean): Unit = {
+      assert(captor.getAllValues().size() === 1)
+      if (expectRemoved) {
+        assert(captor.getValue() > 0)
+      } else {
+        assert(captor.getValue() === 0)
+      }
+    }
+
+    val memSizeCaptor = 
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
+    val diskSizeCaptor = 
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
+    verify(master).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
+      mc.eq(StorageLevel.NONE), memSizeCaptor.capture(), 
diskSizeCaptor.capture())
+    assertSizeReported(memSizeCaptor, removedFromMemory)
+    assertSizeReported(diskSizeCaptor, removedFromDisk)
+  }
+
+  private def assertUpdateBlockInfoNotReported(store: BlockManager, blockId: 
BlockId): Unit = {
+    verify(master, never()).updateBlockInfo(mc.eq(store.blockManagerId), 
mc.eq(blockId),
+      mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
+  }
+
   test("reregistration on heart beat") {
     val store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to