This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 48138eb1a35 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false 48138eb1a35 is described below commit 48138eb1a3561bcdbfb44a94cf94a481a8bcec98 Author: Warren Zhu <warren.zh...@gmail.com> AuthorDate: Thu Sep 28 18:51:33 2023 -0500 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false ### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu <warren.zh...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> --- .../scala/org/apache/spark/storage/BlockInfoManager.scala | 11 +++++++---- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +----- .../org/apache/spark/storage/BlockInfoManagerSuite.scala | 14 ++++++++++++++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 9eb1418fd16..d89e6682adf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging { * then just go ahead and acquire the write lock. Otherwise, if another thread is already * writing the block, then we wait for the write to finish before acquiring the read lock. * - * @return true if the block did not already exist, false otherwise. If this returns false, then - * a read lock on the existing block will be held. If this returns true, a write lock on - * the new block will be held. + * @return true if the block did not already exist, false otherwise. + * If this returns true, a write lock on the new block will be held. + * If this returns false then a read lock will be held iff keepReadLock == true. */ def lockNewBlockForWriting( blockId: BlockId, - newBlockInfo: BlockInfo): Boolean = { + newBlockInfo: BlockInfo, + keepReadLock: Boolean = true): Boolean = { logTrace(s"Task $currentTaskAttemptId trying to put $blockId") // Get the lock that will be associated with the to-be written block and lock it for the entire // duration of this operation. This way we prevent race conditions when two threads try to write @@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging { val result = lockForWriting(blockId, blocking = false) assert(result.isDefined) return true + } else if (!keepReadLock) { + return false } else { // Block already exists. This could happen if another thread races with us to compute // the same block. In this case we try to acquire a read lock, if the locking succeeds diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f35131ee5b9..cdd91fb4c07 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1432,14 +1432,10 @@ private[spark] class BlockManager( val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) - if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) { newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") - if (!keepReadLock) { - // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: - releaseLock(blockId) - } return None } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index 887644a8264..4238b7aa47a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -168,6 +168,20 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { assert(blockInfoManager.get("block").get.readerCount === 1) } + test("lockNewBlockForWriting should not block when keepReadLock is false") { + withTaskId(0) { + assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) + } + val lock1Future = Future { + withTaskId(1) { + blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false) + } + } + + assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds)) + assert(blockInfoManager.get("block").get.readerCount === 0) + } + test("read locks are reentrant") { withTaskId(1) { assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org