Repository: spark
Updated Branches:
  refs/heads/master 712995757 -> 633d63a48


http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e1b2c96..e4ab9ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,6 +45,8 @@ import org.apache.spark.util._
 class BlockManagerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
 
+  import BlockManagerSuite._
+
   var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER,
       master: BlockManagerMaster = this.master): BlockManager = {
+    val serializer = new KryoSerializer(conf)
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
     val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, 
tellMaster = false)
 
     // Checking whether blocks are in memory
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in 
store")
 
     // Checking whether master knows about the blocks or not
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-    store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
-    assert(store.getSingle("a1") === None, "a1 not removed from store")
-    assert(store.getSingle("a2") === None, "a2 not removed from store")
+    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], 
ByteBuffer])
+    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], 
ByteBuffer])
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from 
store")
+    assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from 
store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
     assert(master.getLocations("a2").size === 0, "master did not remove a2")
   }
@@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
     store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
     assert(master.getLocations("a1").size === 2, "master did not report 2 
locations for a1")
     assert(master.getLocations("a2").size === 2, "master did not report 2 
locations for a2")
@@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = 
false)
+    store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a3-to-remove", a3, 
StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
     assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should 
equal 20000")
     assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " 
should <= 12000")
-    assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was 
not in store")
+    assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was 
not in store")
+    assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was 
not in store")
 
     // Checking whether master knows about the blocks or not
     assert(master.getLocations("a1-to-remove").size > 0, "master was not told 
about a1")
@@ -238,15 +241,15 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     master.removeBlock("a3-to-remove")
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a1-to-remove") should be (None)
+      assert(!store.hasLocalBlock("a1-to-remove"))
       master.getLocations("a1-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a2-to-remove") should be (None)
+      assert(!store.hasLocalBlock("a2-to-remove"))
       master.getLocations("a2-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a3-to-remove") should not be (None)
+      assert(store.hasLocalBlock("a3-to-remove"))
       master.getLocations("a3-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -262,30 +265,30 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
-    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle(rdd(0, 0)) should be (None)
+      store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
       master.getLocations(rdd(0, 0)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle(rdd(0, 1)) should be (None)
+      store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
       master.getLocations(rdd(0, 1)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("nonrddblock") should not be (None)
+      store.getSingleAndReleaseLock("nonrddblock") should not be (None)
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
-    store.getSingle(rdd(0, 0)) should be (None)
+    store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
     master.getLocations(rdd(0, 0)) should have size 0
-    store.getSingle(rdd(0, 1)) should be (None)
+    store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
     master.getLocations(rdd(0, 1)) should have size 0
   }
 
@@ -305,54 +308,54 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
     // insert broadcast blocks in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
     }
 
     // verify whether the blocks exist in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.getLocal(broadcast0BlockId) should not be (None)
-      s.getLocal(broadcast1BlockId) should not be (None)
-      s.getLocal(broadcast2BlockId) should not be (None)
-      s.getLocal(broadcast2BlockId2) should not be (None)
+      assert(s.hasLocalBlock(broadcast0BlockId))
+      assert(s.hasLocalBlock(broadcast1BlockId))
+      assert(s.hasLocalBlock(broadcast2BlockId))
+      assert(s.hasLocalBlock(broadcast2BlockId2))
     }
 
     // remove broadcast 0 block only from executors
     master.removeBroadcast(0, removeFromMaster = false, blocking = true)
 
     // only broadcast 0 block should be removed from the executor store
-    executorStore.getLocal(broadcast0BlockId) should be (None)
-    executorStore.getLocal(broadcast1BlockId) should not be (None)
-    executorStore.getLocal(broadcast2BlockId) should not be (None)
+    assert(!executorStore.hasLocalBlock(broadcast0BlockId))
+    assert(executorStore.hasLocalBlock(broadcast1BlockId))
+    assert(executorStore.hasLocalBlock(broadcast2BlockId))
 
     // nothing should be removed from the driver store
-    driverStore.getLocal(broadcast0BlockId) should not be (None)
-    driverStore.getLocal(broadcast1BlockId) should not be (None)
-    driverStore.getLocal(broadcast2BlockId) should not be (None)
+    assert(driverStore.hasLocalBlock(broadcast0BlockId))
+    assert(driverStore.hasLocalBlock(broadcast1BlockId))
+    assert(driverStore.hasLocalBlock(broadcast2BlockId))
 
     // remove broadcast 0 block from the driver as well
     master.removeBroadcast(0, removeFromMaster = true, blocking = true)
-    driverStore.getLocal(broadcast0BlockId) should be (None)
-    driverStore.getLocal(broadcast1BlockId) should not be (None)
+    assert(!driverStore.hasLocalBlock(broadcast0BlockId))
+    assert(driverStore.hasLocalBlock(broadcast1BlockId))
 
     // remove broadcast 1 block from both the stores asynchronously
     // and verify all broadcast 1 blocks have been removed
     master.removeBroadcast(1, removeFromMaster = true, blocking = false)
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      driverStore.getLocal(broadcast1BlockId) should be (None)
-      executorStore.getLocal(broadcast1BlockId) should be (None)
+      assert(!driverStore.hasLocalBlock(broadcast1BlockId))
+      assert(!executorStore.hasLocalBlock(broadcast1BlockId))
     }
 
     // remove broadcast 2 from both the stores asynchronously
     // and verify all broadcast 2 blocks have been removed
     master.removeBroadcast(2, removeFromMaster = true, blocking = false)
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      driverStore.getLocal(broadcast2BlockId) should be (None)
-      driverStore.getLocal(broadcast2BlockId2) should be (None)
-      executorStore.getLocal(broadcast2BlockId) should be (None)
-      executorStore.getLocal(broadcast2BlockId2) should be (None)
+      assert(!driverStore.hasLocalBlock(broadcast2BlockId))
+      assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
+      assert(!executorStore.hasLocalBlock(broadcast2BlockId))
+      assert(!executorStore.hasLocalBlock(broadcast2BlockId2))
     }
     executorStore.stop()
     driverStore.stop()
@@ -363,9 +366,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
 
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in 
store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
@@ -381,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from 
master")
 
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with 
master")
@@ -404,12 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+          store.putIteratorAndReleaseLock(
+            "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
         override def run() {
-          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+          store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
         }
       }
       val t3 = new Thread {
@@ -425,8 +429,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       t2.join()
       t3.join()
 
-      store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-      store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], 
ByteBuffer])
+      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], 
ByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -437,9 +441,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new 
Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
     val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, 
tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = 
true)
+    store.putIteratorAndReleaseLock(
+      "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
@@ -479,8 +486,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     store2 = makeBlockManager(8000, "executor2")
     store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
-    store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store2.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store3.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be 
fetched")
     store2.stop()
     store2 = null
@@ -506,18 +515,18 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle("a1", a1, storageLevel)
-    store.putSingle("a2", a2, storageLevel)
-    store.putSingle("a3", a3, storageLevel)
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
-    assert(store.getSingle("a1") === None, "a1 was in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingleAndReleaseLock("a2", a2, storageLevel)
+    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
-    store.putSingle("a1", a1, storageLevel)
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3") === None, "a3 was in store")
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
   }
 
   test("in-memory LRU for partitions of same RDD") {
@@ -525,34 +534,34 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced 
partitions 1 and 2
     // from the same RDD
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
-    assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in 
store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was 
not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was 
not in store")
     // Check that rdd_0_3 doesn't replace them even after further accesses
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in 
store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in 
store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in 
store")
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = makeBlockManager(12000)
-    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
     assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
     // Do a get() on rdd_0_2 so that it is the most recently used item
-    assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was 
not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should 
*not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -567,28 +576,28 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
-    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
-    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
-    assert(store.getSingle("a2").isDefined, "a2 was in store")
-    assert(store.getSingle("a3").isDefined, "a3 was in store")
-    assert(store.getSingle("a1").isDefined, "a1 was in store")
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
   }
 
   test("disk and memory storage") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, 
_.getSingleAndReleaseLock)
   }
 
   test("disk and memory storage with getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, 
_.getLocalBytesAndReleaseLock)
   }
 
   test("disk and memory storage with serialization") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, 
_.getSingleAndReleaseLock)
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, 
_.getLocalBytesAndReleaseLock)
   }
 
   def testDiskAndMemoryStorage(
@@ -598,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle("a1", a1, storageLevel)
-    store.putSingle("a2", a2, storageLevel)
-    store.putSingle("a3", a3, storageLevel)
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingleAndReleaseLock("a2", a2, storageLevel)
+    store.putSingleAndReleaseLock("a3", a3, storageLevel)
     assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
     assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
     assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -615,19 +624,19 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
     val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in 
store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
-    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
-    assert(store.getSingle("a1") == None, "a1 was in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
-    assert(store.getSingle("a4").isDefined, "a4 was not in store")
+    store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in 
store")
+    assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in 
store")
   }
 
   test("in-memory LRU with streams") {
@@ -635,23 +644,27 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in 
store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in 
store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in 
store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in 
store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in 
store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list3") === None, "list1 was in store")
   }
 
   test("LRU with mixed storage levels and streams") {
@@ -661,33 +674,37 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, 
tellMaster = true)
-    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, 
tellMaster = true)
-    store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, 
tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
     val listSize = SizeEstimator.estimate(listForSizeEstimate)
     // At this point LRU should not kick in because list3 is only on disk
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in 
store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in 
store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in 
store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in 
store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in 
store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in 
store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should 
drop out
-    store.putIterator("list4", list4.iterator, 
StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
-    assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = 
true)
+    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in 
store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in 
store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in 
store")
     assert(store.get("list4").get.data.size === 2)
   }
 
@@ -705,18 +722,19 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
   test("overly large block") {
     store = makeBlockManager(5000)
-    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
-    assert(store.getSingle("a1") === None, "a1 was in store")
-    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingleAndReleaseLock("a1", new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY)
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+    store.putSingleAndReleaseLock("a2", new Array[Byte](10000), 
StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory 
store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
   }
 
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
       store = makeBlockManager(20000, "exec1")
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
       store.stop()
@@ -724,7 +742,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        ShuffleBlockId(0, 0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
       store.stop()
@@ -732,7 +751,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "true")
       store = makeBlockManager(20000, "exec3")
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
       store.stop()
@@ -740,28 +760,29 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "false")
       store = makeBlockManager(20000, "exec4")
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, 
"broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
       store = makeBlockManager(20000, "exec5")
-      store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not 
compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
       store = makeBlockManager(20000, "exec6")
-      store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was 
compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
-      store.putSingle("other_block", new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY)
+      store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block 
was compressed")
       store.stop()
       store = null
@@ -789,12 +810,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     class UnserializableClass
     val a1 = new UnserializableClass
     intercept[java.io.NotSerializableException] {
-      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+      store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
     }
 
     // Make sure get a1 doesn't hang and returns None.
     failAfter(1 second) {
-      assert(store.getSingle("a1").isEmpty, "a1 should not be in store")
+      assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in 
store")
     }
   }
 
@@ -844,6 +865,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
   test("updated block statuses") {
     store = makeBlockManager(12000)
+    store.registerTask(0)
     val list = List.fill(2)(new Array[Byte](2000))
     val bigList = List.fill(8)(new Array[Byte](2000))
 
@@ -860,7 +882,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 = getUpdatedBlocks {
-      store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
@@ -868,7 +891,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 = getUpdatedBlocks {
-      store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = 
true)
     }
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
@@ -876,7 +900,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 = getUpdatedBlocks {
-      store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
@@ -890,7 +915,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) 
while list4 is added
     val updatedBlocks4 = getUpdatedBlocks {
-      store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
@@ -905,7 +931,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
     // No updated blocks - list5 is too big to fit in store and nothing is 
kicked out
     val updatedBlocks5 = getUpdatedBlocks {
-      store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks5.size === 0)
 
@@ -929,9 +956,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
-    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
-    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getLocations("list1").size === 0)
@@ -945,9 +975,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 
and list6 remains.
-    store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = false)
-    store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
-    store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
     // getBlockStatus without asking slaves should have the same result
@@ -968,9 +1001,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
-    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
-    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("list"), 
askSlaves = false).size
@@ -979,9 +1015,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       === 1)
 
     // insert some more blocks
-    store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = true)
-    store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
-    store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = 
true)
+    store.putIteratorAndReleaseLock(
+      "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = 
false)
+    store.putIteratorAndReleaseLock(
+      "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = 
false)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), 
askSlaves = false).size
@@ -991,7 +1030,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, 
tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
@@ -1002,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = makeBlockManager(12000)
-    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
-    assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was 
not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), 
StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1086,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out 
someBlock1.
-    store.putIterator("someBlock1", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
-    store.putIterator("someBlock2", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1098,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. Even after ensuring free space 
of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This 
returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIterator("someBlock3", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an 
iterator
@@ -1130,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // would not know how to drop them from memory later.
     memoryStore.remove("b1")
     memoryStore.remove("b2")
-    store.putIterator("b1", smallIterator, memOnly)
-    store.putIterator("b2", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
     val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, 
returnValues = true)
@@ -1142,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
-    store.putIterator("b3", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
     val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, 
returnValues = true)
@@ -1169,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = 
bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIterator("b1", smallIterator, memAndDisk)
-    store.putIterator("b2", smallIterator, memAndDisk)
+    store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
+    store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
 
     // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
     // Memory store should contain b2 and b3, while disk store should contain 
only b1
@@ -1183,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
-    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("b3", smallIterator, 
StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the 
new block to disk
@@ -1244,6 +1284,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
+    store.blockInfoManager.lockNewBlockForWriting(
+      blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
     val result = memoryStore.putBytes(blockId, 13000, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be 
created")
     })
@@ -1263,4 +1305,104 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(result.size === 10000)
     assert(result.data === Right(bytes))
   }
+
+  test("read-locked blocks cannot be evicted from the MemoryStore") {
+    store = makeBlockManager(12000)
+    val arr = new Array[Byte](4000)
+    // First store a1 and a2, both in memory, and a3, on disk only
+    store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    // This put should fail because both a1 and a2 should be read-locked:
+    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a3").isEmpty, "a3 was in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    // Release both pins of block a2:
+    store.releaseLock("a2")
+    store.releaseLock("a2")
+    // Block a1 is the least-recently accessed, so an LRU eviction policy 
would evict it before
+    // block a2. However, a1 is still pinned so this put of a3 should evict a2 
instead:
+    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a2").isEmpty, "a2 was in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+  }
+}
+
+private object BlockManagerSuite {
+
+  private implicit class BlockManagerTestUtils(store: BlockManager) {
+
+    def putSingleAndReleaseLock(
+        block: BlockId,
+        value: Any,
+        storageLevel: StorageLevel,
+        tellMaster: Boolean): Unit = {
+      if (store.putSingle(block, value, storageLevel, tellMaster)) {
+        store.releaseLock(block)
+      }
+    }
+
+    def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: 
StorageLevel): Unit = {
+      if (store.putSingle(block, value, storageLevel)) {
+        store.releaseLock(block)
+      }
+    }
+
+    def putIteratorAndReleaseLock(
+        blockId: BlockId,
+        values: Iterator[Any],
+        level: StorageLevel): Unit = {
+      if (store.putIterator(blockId, values, level)) {
+        store.releaseLock(blockId)
+      }
+    }
+
+    def putIteratorAndReleaseLock(
+        blockId: BlockId,
+        values: Iterator[Any],
+        level: StorageLevel,
+        tellMaster: Boolean): Unit = {
+      if (store.putIterator(blockId, values, level, tellMaster)) {
+        store.releaseLock(blockId)
+      }
+    }
+
+    def dropFromMemoryIfExists(
+        blockId: BlockId,
+        data: () => Either[Array[Any], ByteBuffer]): Unit = {
+      store.blockInfoManager.lockForWriting(blockId).foreach { info =>
+        val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
+        if (newEffectiveStorageLevel.isValid) {
+          // The block is still present in at least one store, so release the 
lock
+          // but don't delete the block info
+          store.releaseLock(blockId)
+        } else {
+          // The block isn't present in any store, so delete the block info so 
that the
+          // block can be stored again
+          store.blockInfoManager.removeBlock(blockId)
+        }
+      }
+    }
+
+    private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = 
(blockId: BlockId) => {
+      val result = f(blockId)
+      if (result.isDefined) {
+        store.releaseLock(blockId)
+      }
+      result
+    }
+
+    def hasLocalBlock(blockId: BlockId): Boolean = {
+      getLocalAndReleaseLock(blockId).isDefined
+    }
+
+    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = 
wrapGet(store.getLocal)
+    val getAndReleaseLock: (BlockId) => Option[BlockResult] = 
wrapGet(store.get)
+    val getSingleAndReleaseLock: (BlockId) => Option[Any] = 
wrapGet(store.getSingle)
+    val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = 
wrapGet(store.getLocalBytes)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
 
b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
index f55b884..631d767 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.Unpooled;
 /**
  * A {@link ManagedBuffer} backed by {@link ByteBuffer}.
  */
-public final class NioManagedBuffer extends ManagedBuffer {
+public class NioManagedBuffer extends ManagedBuffer {
   private final ByteBuffer buf;
 
   public NioManagedBuffer(ByteBuffer buf) {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 83d7953..efa2eea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -46,7 +46,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
   }
 
   def isMaterialized(rddId: Int): Boolean = {
-    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    maybeBlock.foreach(_ => 
sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty
   }
 
   test("withColumn doesn't invalidate cached dataframe") {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 11863ca..86f02e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -40,7 +40,9 @@ class CachedTableSuite extends QueryTest with 
TestHiveSingleton {
   }
 
   def isMaterialized(rddId: Int): Boolean = {
-    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    maybeBlock.foreach(_ => 
sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty
   }
 
   test("cache table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index e22e320..3d9c085 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,6 +91,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
     if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level 
$storageLevel")
+    } else {
+      blockManager.releaseLock(blockId)
     }
     BlockManagerBasedStoreResult(blockId, numRecords)
   }
@@ -189,6 +191,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level 
$storageLevel")
+      } else {
+        blockManager.releaseLock(blockId)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to