This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 752e8a1a7ee0 [SPARK-53807][SPARK-50771][CORE] Fix race condition
issues between `unlock` and `releaseAllLocksForTask` in `BlockInfoManager`
752e8a1a7ee0 is described below
commit 752e8a1a7ee030ab8d0879c569754edce7b0b0f4
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Jan 6 14:39:32 2026 +0900
[SPARK-53807][SPARK-50771][CORE] Fix race condition issues between `unlock`
and `releaseAllLocksForTask` in `BlockInfoManager`
### What changes were proposed in this pull request?
This PR fixes race condition issues between `unlock` and
`releaseAllLocksForTask` in `BlockInfoManager`.
In case read locks for a block acquired by a task are released by `unclck`
and `releaseAllLocksForTask` concurrently, assertion error can happen.
The reason is calling `entry.getCount` in `releaseAllLocksForTask` can
return an old value even after the count in an entry is decreased by
`countsForTask.remove` on another thread. So `info.readerCount -= lockCount`
can result in a negative number, causing assertion error.
This issue can be reproduced by inserting sleep into `unlock` and
`releaseAllLocksForTask` like as follows.
* unlock
```
// reader counts. We need to check if the readLocksByTask per tasks are
present, if they
// are not then we know releaseAllLocksForTask has already cleaned up
the read lock.
val countsForTask = readLocksByTask.get(taskAttemptId)
+ Thread.sleep(5)
if (countsForTask != null) {
assert(info.readerCount > 0, s"Block $blockId is not locked for
reading")
info.readerCount -= 1
```
* releaseAllLocksForTask
```
+ Thread.sleep(5)
val readLocks = Option(readLocksByTask.remove(taskAttemptId))
.getOrElse(ImmutableMultiset.of[BlockId])
readLocks.entrySet().forEach { entry =>
val blockId = entry.getElement
val lockCount = entry.getCount
+ Thread.sleep(5)
blocksWithReleasedLocks += blockId
```
And then, run the test like as follows.
```
$ build/sbt 'core/testOnly org.apache.spark.storage.BlockInfoManagerSuite
-- -z SPARK-38675'
```
The Javadoc for
[ConcurrentHashMultiset#entrySet](https://guava.dev/releases/33.4.0-jre/api/docs/com/google/common/collect/ConcurrentHashMultiset.html)
says as follows.
```
However, multiset changes may or may not be reflected in any Entry
instances already retrieved from the entry set (this is
implementation-dependent)
```
So, this PR calculates `lockCount` by calling `readLocks.count` to get the
latest count, and place it within `blockInfo` block for exclusive execution.
Similar to read locks, a race condition isssue can happen even for write
locks.
During `writeLocks.forEach` in `releaseAllLocksForTask`, a `blockId` can be
removed from `writeLocks` by
`writeLocksByTask.get(taskAttemptId).remove(blockId)` in `unlock` on another
thread.
You can reproduce this issue by the new test added in this PR.
This PR fixes this issue by checking the existence of a `blockId` by
`writeLocks.contains(info)` within `blockInfo` block.
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Confirmed `SPARK-38675 - concurrent unlock and releaseAllLocksForTask calls
should not fail` passes even if sleeps are inserted into `unlock` and
`releaseAllLocksForTask` like as follows.
* unlock
```
val countsForTask = readLocksByTask.get(taskAttemptId)
if (countsForTask != null) {
+ Thread.sleep(5)
assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
info.readerCount -= 1
val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
```
* releaseAllLocksForTask
```
+ Thread.sleep(5)
val readLocks = Option(readLocksByTask.remove(taskAttemptId))
.getOrElse(ImmutableMultiset.of[BlockId])
readLocks.entrySet().forEach { entry =>
val blockId = entry.getElement
```
```
// Using readLocks.count instead of entry.getCount is intentional. See
discussion in
// SPARK-50771.
val lockCount = readLocks.count(blockId)
+ Thread.sleep(5)
// lockCount can be 0 if read locks for `blockId` are released in
`unlock` concurrently.
if (lockCount > 0) {
blocksWithReleasedLocks += blockId
info.readerCount -= lockCount
```
Also new test for write locks is added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52524 from sarutak/fix-race-blockinfomanager.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
.../apache/spark/storage/BlockInfoManager.scala | 33 +++++++++++++++-------
.../spark/storage/BlockInfoManagerSuite.scala | 20 +++++++++++++
2 files changed, 43 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 322e4b18c5d4..69250e247573 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -397,7 +397,10 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
blockInfo(blockId) { (info, condition) =>
if (info.writerTask != BlockInfo.NO_WRITER) {
info.writerTask = BlockInfo.NO_WRITER
- writeLocksByTask.get(taskAttemptId).remove(blockId)
+ val blockIds = writeLocksByTask.get(taskAttemptId)
+ if (blockIds != null) {
+ blockIds.remove(blockId)
+ }
} else {
// There can be a race between unlock and releaseAllLocksForTask which
causes negative
// reader counts. We need to check if the readLocksByTask per tasks
are present, if they
@@ -489,23 +492,33 @@ private[storage] class
BlockInfoManager(trackingCacheVisibility: Boolean = false
val writeLocks =
Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(util.Set.of())
writeLocks.forEach { blockId =>
blockInfo(blockId) { (info, condition) =>
- assert(info.writerTask == taskAttemptId)
- info.writerTask = BlockInfo.NO_WRITER
- condition.signalAll()
+ // Check the existence of `blockId` because `unlock` may have already
removed it
+ // concurrently.
+ if (writeLocks.contains(blockId)) {
+ blocksWithReleasedLocks += blockId
+ assert(info.writerTask == taskAttemptId)
+ info.writerTask = BlockInfo.NO_WRITER
+ condition.signalAll()
+ }
}
- blocksWithReleasedLocks += blockId
}
val readLocks = Option(readLocksByTask.remove(taskAttemptId))
.getOrElse(ImmutableMultiset.of[BlockId])
readLocks.entrySet().forEach { entry =>
val blockId = entry.getElement
- val lockCount = entry.getCount
- blocksWithReleasedLocks += blockId
blockInfo(blockId) { (info, condition) =>
- info.readerCount -= lockCount
- assert(info.readerCount >= 0)
- condition.signalAll()
+ // Calculating lockCount by readLocks.count instead of entry.getCount
is intentional. See
+ // discussion in SPARK-50771 and the corresponding PR.
+ val lockCount = readLocks.count(blockId)
+
+ // lockCount can be 0 if read locks for `blockId` are released in
`unlock` concurrently.
+ if (lockCount > 0) {
+ blocksWithReleasedLocks += blockId
+ info.readerCount -= lockCount
+ assert(info.readerCount >= 0)
+ condition.signalAll()
+ }
}
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index f7c7ca2bd936..4b34c13706ad 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -403,4 +403,24 @@ class BlockInfoManagerSuite extends SparkFunSuite {
}
}
}
+
+ test("SPARK-53807 - concurrent unlock and releaseAllLocksForTask for write
should not fail") {
+ val blockId = TestBlockId("block")
+ assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo()))
+ blockInfoManager.unlock(blockId)
+
+ // Without the fix the block below almost always fails.
+ (0 to 10).foreach { task =>
+ withTaskId(task) {
+ blockInfoManager.registerTask(task)
+
+ assert(blockInfoManager.lockForWriting(blockId).isDefined)
+
+ val future = Future(blockInfoManager.unlock(blockId, Option(task)))
+ blockInfoManager.releaseAllLocksForTask(task)
+
+ ThreadUtils.awaitReady(future, 100.millis)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]