This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 48138eb1a35 [SPARK-45057][CORE] Avoid acquire read lock when 
keepReadLock is false
48138eb1a35 is described below

commit 48138eb1a3561bcdbfb44a94cf94a481a8bcec98
Author: Warren Zhu <warren.zh...@gmail.com>
AuthorDate: Thu Sep 28 18:51:33 2023 -0500

    [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
    
    ### What changes were proposed in this pull request?
    Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When 
`keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock 
or potential deadlock issue.
    
    When 2 tasks try to compute same rdd with replication level of 2 and 
running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]
    
    Task thread hold write lock and waiting for replication to remote executor 
while shuffle server thread which handling block upload request waiting on 
`lockForReading` in 
[BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)
    
    ### Why are the changes needed?
    This could save unnecessary read lock acquire and avoid deadlock issue 
mention above.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT in BlockInfoManagerSuite
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43067 from warrenzhu25/deadlock.
    
    Authored-by: Warren Zhu <warren.zh...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../scala/org/apache/spark/storage/BlockInfoManager.scala  | 11 +++++++----
 .../main/scala/org/apache/spark/storage/BlockManager.scala |  6 +-----
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala   | 14 ++++++++++++++
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 9eb1418fd16..d89e6682adf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging {
    * then just go ahead and acquire the write lock. Otherwise, if another 
thread is already
    * writing the block, then we wait for the write to finish before acquiring 
the read lock.
    *
-   * @return true if the block did not already exist, false otherwise. If this 
returns false, then
-   *         a read lock on the existing block will be held. If this returns 
true, a write lock on
-   *         the new block will be held.
+   * @return true if the block did not already exist, false otherwise.
+   *         If this returns true, a write lock on the new block will be held.
+   *         If this returns false then a read lock will be held iff 
keepReadLock == true.
    */
   def lockNewBlockForWriting(
       blockId: BlockId,
-      newBlockInfo: BlockInfo): Boolean = {
+      newBlockInfo: BlockInfo,
+      keepReadLock: Boolean = true): Boolean = {
     logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
     // Get the lock that will be associated with the to-be written block and 
lock it for the entire
     // duration of this operation. This way we prevent race conditions when 
two threads try to write
@@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging {
           val result = lockForWriting(blockId, blocking = false)
           assert(result.isDefined)
           return true
+        } else if (!keepReadLock) {
+          return false
         } else {
           // Block already exists. This could happen if another thread races 
with us to compute
           // the same block. In this case we try to acquire a read lock, if 
the locking succeeds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f35131ee5b9..cdd91fb4c07 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1432,14 +1432,10 @@ private[spark] class BlockManager(
 
     val putBlockInfo = {
       val newInfo = new BlockInfo(level, classTag, tellMaster)
-      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, 
keepReadLock)) {
         newInfo
       } else {
         logWarning(s"Block $blockId already exists on this machine; not 
re-adding it")
-        if (!keepReadLock) {
-          // lockNewBlockForWriting returned a read lock on the existing 
block, so we must free it:
-          releaseLock(blockId)
-        }
         return None
       }
     }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 887644a8264..4238b7aa47a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -168,6 +168,20 @@ class BlockInfoManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(blockInfoManager.get("block").get.readerCount === 1)
   }
 
+  test("lockNewBlockForWriting should not block when keepReadLock is false") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val lock1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false)
+      }
+    }
+
+    assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds))
+    assert(blockInfoManager.get("block").get.readerCount === 0)
+  }
+
   test("read locks are reentrant") {
     withTaskId(1) {
       assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to