Repository: spark Updated Branches: refs/heads/master 68be5b9e8 -> 78c1076d0
[SPARK-14252] Executors do not try to download remote cached blocks ## What changes were proposed in this pull request? As mentioned in the ticket this was because one get path in the refactored `BlockManager` did not check for remote storage. ## How was this patch tested? Unit test, also verified manually with reproduction in the ticket. cc JoshRosen Author: Eric Liang <e...@databricks.com> Closes #12193 from ericl/spark-14252. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78c1076d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78c1076d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78c1076d Branch: refs/heads/master Commit: 78c1076d0421cc41cbdb788f38b13c9a00e8f561 Parents: 68be5b9 Author: Eric Liang <e...@databricks.com> Authored: Tue Apr 5 22:37:51 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Tue Apr 5 22:37:51 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/storage/BlockManager.scala | 8 ++++++++ .../org/apache/spark/storage/BlockManagerSuite.scala | 13 +++++++++++++ 2 files changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/78c1076d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9608418..35a6c63 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -643,6 +643,14 @@ private[spark] class BlockManager( level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { + // Attempt to read the block from local or remote storage. If it's present, then we don't need + // to go through the local-get-or-put path. + get(blockId) match { + case Some(block) => + return Left(block) + case _ => + // Need to compute the block. + } // Initially we hold no locks on this block. doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => http://git-wip-us.apache.org/repos/asf/spark/blob/78c1076d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 32c00ac..66b28de 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -515,6 +515,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("SPARK-14252: getOrElseUpdate should still read from remote storage") { + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") + val list1 = List(new Array[Byte](4000)) + store2.putIterator( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.getOrElseUpdate( + "list1", + StorageLevel.MEMORY_ONLY, + ClassTag.Any, + () => throw new AssertionError("attempted to compute locally")).isLeft) + } + test("in-memory LRU storage") { testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org