This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 46dce63  [SPARK-34193][CORE] TorrentBroadcast block manager 
decommissioning race fix
46dce63 is described below

commit 46dce636e9a03ee15a9f622a6136e832174ac90b
Author: Holden Karau <[email protected]>
AuthorDate: Thu Jan 28 06:15:35 2021 +0900

    [SPARK-34193][CORE] TorrentBroadcast block manager decommissioning race fix
    
    ### What changes were proposed in this pull request?
    
    Allow broadcast blocks to be put during decommissioning since migrations 
don't apply to them and they may be stored as part of job exec.
    
    ### Why are the changes needed?
    
    Potential race condition.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Removal of race condition.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #31298 from 
holdenk/SPARK-34193-torrentbroadcast-blockmanager-decommissioning-potential-race-condition.
    
    Authored-by: Holden Karau <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit 9d83d62f142ba89518194f176bb81adadc28951b)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../scala/org/apache/spark/storage/BlockManager.scala   | 17 +++++++++++------
 .../org/apache/spark/storage/BlockManagerSuite.scala    | 12 ++++++++++++
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a5b8d5d..4c09e16 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -258,6 +258,15 @@ private[spark] class BlockManager(
   @inline final private def isDecommissioning() = {
     decommissioner.isDefined
   }
+
+  @inline final private def checkShouldStore(blockId: BlockId) = {
+    // Don't reject broadcast blocks since they may be stored during task exec 
and
+    // don't need to be migrated.
+    if (isDecommissioning() && !blockId.isBroadcast) {
+        throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
+    }
+  }
+
   // This is a lazy val so someone can migrating RDDs even if they don't have 
a MigratableResolver
   // for shuffles. Used in BlockManagerDecommissioner & block puts.
   private[storage] lazy val migratableResolver: MigratableResolver = {
@@ -670,9 +679,7 @@ private[spark] class BlockManager(
       level: StorageLevel,
       classTag: ClassTag[_]): StreamCallbackWithID = {
 
-    if (isDecommissioning()) {
-       throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
-    }
+    checkShouldStore(blockId)
 
     if (blockId.isShuffle) {
       logDebug(s"Putting shuffle block ${blockId}")
@@ -1321,9 +1328,7 @@ private[spark] class BlockManager(
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
-    if (isDecommissioning()) {
-      throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
-    }
+    checkShouldStore(blockId)
 
     val putBlockInfo = {
       val newInfo = new BlockInfo(level, classTag, tellMaster)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 44b6f1b..09678c7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -2038,6 +2038,18 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     
assert(sortedBlocks.sameElements(decomManager.shufflesToMigrate.asScala.map(_._1)))
   }
 
+  test("SPARK-34193: Potential race condition during decommissioning with 
TorrentBroadcast") {
+    // Validate that we allow putting of broadcast blocks during 
decommissioning
+    val exec1 = "exec1"
+
+    val store = makeBlockManager(1000, exec1)
+    master.decommissionBlockManagers(Seq(exec1))
+    val a = new Array[Byte](1)
+    // Put a broadcast block, no exception
+    val broadcast0BlockId = BroadcastBlockId(0)
+    store.putSingle(broadcast0BlockId, a, StorageLevel.DISK_ONLY)
+  }
+
   class MockBlockTransferService(
       val maxFailures: Int,
       override val hostName: String = "MockBlockTransferServiceHost") extends 
BlockTransferService {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to