Repository: spark Updated Branches: refs/heads/master 18066f2e6 -> f9151bebc
[SPARK-21188][CORE] releaseAllLocksForTask should synchronize the whole method ## What changes were proposed in this pull request? Since the objects `readLocksByTask`, `writeLocksByTask` and `info`s are coupled and supposed to be modified by other threads concurrently, all the read and writes of them in the method `releaseAllLocksForTask` should be protected by a single synchronized block like other similar methods. ## How was this patch tested? existing tests Author: Feng Liu <feng...@databricks.com> Closes #18400 from liufengdb/synchronize. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9151beb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9151beb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9151beb Branch: refs/heads/master Commit: f9151bebca986d44cdab7699959fec2bc050773a Parents: 18066f2 Author: Feng Liu <feng...@databricks.com> Authored: Thu Jun 29 16:03:15 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Jun 29 16:03:15 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/storage/BlockInfoManager.scala | 24 ++++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f9151beb/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 7064872..219a0e7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -341,15 +341,11 @@ private[storage] class BlockInfoManager extends Logging { * * @return the ids of blocks whose pins were released */ - def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = { + def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized { val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]() - val readLocks = synchronized { - readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]()) - } - val writeLocks = synchronized { - writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty) - } + val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]()) + val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty) for (blockId <- writeLocks) { infos.get(blockId).foreach { info => @@ -358,21 +354,19 @@ private[storage] class BlockInfoManager extends Logging { } blocksWithReleasedLocks += blockId } + readLocks.entrySet().iterator().asScala.foreach { entry => val blockId = entry.getElement val lockCount = entry.getCount blocksWithReleasedLocks += blockId - synchronized { - get(blockId).foreach { info => - info.readerCount -= lockCount - assert(info.readerCount >= 0) - } + get(blockId).foreach { info => + info.readerCount -= lockCount + assert(info.readerCount >= 0) } } - synchronized { - notifyAll() - } + notifyAll() + blocksWithReleasedLocks } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org