Repository: spark Updated Branches: refs/heads/master 712995757 -> 633d63a48
http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e1b2c96..e4ab9ee 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -45,6 +45,8 @@ import org.apache.spark.util._ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties { + import BlockManagerSuite._ + var conf: SparkConf = null var store: BlockManager = null var store2: BlockManager = null @@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER, master: BlockManagerMaster = this.master): BlockManager = { + val serializer = new KryoSerializer(conf) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, @@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory - assert(store.getSingle("a1").isDefined, "a1 was not in store") - assert(store.getSingle("a2").isDefined, "a2 was not in store") - assert(store.getSingle("a3").isDefined, "a3 was not in store") + assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") // Checking whether master knows about the blocks or not assert(master.getLocations("a1").size > 0, "master was not told about a1") @@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) - store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) - assert(store.getSingle("a1") === None, "a1 not removed from store") - assert(store.getSingle("a2") === None, "a2 not removed from store") + store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) + assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store") + assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") assert(master.getLocations("a2").size === 0, "master did not remove a2") } @@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2) store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") @@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 - store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory and memory size val memStatus = master.getMemoryStatus.head._2 assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000") assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000") - assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store") - assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store") - assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store") + assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store") + assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store") // Checking whether master knows about the blocks or not assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1") @@ -238,15 +241,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeBlock("a3-to-remove") eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle("a1-to-remove") should be (None) + assert(!store.hasLocalBlock("a1-to-remove")) master.getLocations("a1-to-remove") should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle("a2-to-remove") should be (None) + assert(!store.hasLocalBlock("a2-to-remove")) master.getLocations("a2-to-remove") should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle("a3-to-remove") should not be (None) + assert(store.hasLocalBlock("a3-to-remove")) master.getLocations("a3-to-remove") should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { @@ -262,30 +265,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory. - store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) - store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = false) eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle(rdd(0, 0)) should be (None) + store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) master.getLocations(rdd(0, 0)) should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle(rdd(0, 1)) should be (None) + store.getSingleAndReleaseLock(rdd(0, 1)) should be (None) master.getLocations(rdd(0, 1)) should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle("nonrddblock") should not be (None) + store.getSingleAndReleaseLock("nonrddblock") should not be (None) master.getLocations("nonrddblock") should have size (1) } - store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = true) - store.getSingle(rdd(0, 0)) should be (None) + store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) master.getLocations(rdd(0, 0)) should have size 0 - store.getSingle(rdd(0, 1)) should be (None) + store.getSingleAndReleaseLock(rdd(0, 1)) should be (None) master.getLocations(rdd(0, 1)) should have size 0 } @@ -305,54 +308,54 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // insert broadcast blocks in both the stores Seq(driverStore, executorStore).foreach { case s => - s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY) - s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY) - s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY) - s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY) + s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY) + s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY) + s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY) + s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY) } // verify whether the blocks exist in both the stores Seq(driverStore, executorStore).foreach { case s => - s.getLocal(broadcast0BlockId) should not be (None) - s.getLocal(broadcast1BlockId) should not be (None) - s.getLocal(broadcast2BlockId) should not be (None) - s.getLocal(broadcast2BlockId2) should not be (None) + assert(s.hasLocalBlock(broadcast0BlockId)) + assert(s.hasLocalBlock(broadcast1BlockId)) + assert(s.hasLocalBlock(broadcast2BlockId)) + assert(s.hasLocalBlock(broadcast2BlockId2)) } // remove broadcast 0 block only from executors master.removeBroadcast(0, removeFromMaster = false, blocking = true) // only broadcast 0 block should be removed from the executor store - executorStore.getLocal(broadcast0BlockId) should be (None) - executorStore.getLocal(broadcast1BlockId) should not be (None) - executorStore.getLocal(broadcast2BlockId) should not be (None) + assert(!executorStore.hasLocalBlock(broadcast0BlockId)) + assert(executorStore.hasLocalBlock(broadcast1BlockId)) + assert(executorStore.hasLocalBlock(broadcast2BlockId)) // nothing should be removed from the driver store - driverStore.getLocal(broadcast0BlockId) should not be (None) - driverStore.getLocal(broadcast1BlockId) should not be (None) - driverStore.getLocal(broadcast2BlockId) should not be (None) + assert(driverStore.hasLocalBlock(broadcast0BlockId)) + assert(driverStore.hasLocalBlock(broadcast1BlockId)) + assert(driverStore.hasLocalBlock(broadcast2BlockId)) // remove broadcast 0 block from the driver as well master.removeBroadcast(0, removeFromMaster = true, blocking = true) - driverStore.getLocal(broadcast0BlockId) should be (None) - driverStore.getLocal(broadcast1BlockId) should not be (None) + assert(!driverStore.hasLocalBlock(broadcast0BlockId)) + assert(driverStore.hasLocalBlock(broadcast1BlockId)) // remove broadcast 1 block from both the stores asynchronously // and verify all broadcast 1 blocks have been removed master.removeBroadcast(1, removeFromMaster = true, blocking = false) eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - driverStore.getLocal(broadcast1BlockId) should be (None) - executorStore.getLocal(broadcast1BlockId) should be (None) + assert(!driverStore.hasLocalBlock(broadcast1BlockId)) + assert(!executorStore.hasLocalBlock(broadcast1BlockId)) } // remove broadcast 2 from both the stores asynchronously // and verify all broadcast 2 blocks have been removed master.removeBroadcast(2, removeFromMaster = true, blocking = false) eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - driverStore.getLocal(broadcast2BlockId) should be (None) - driverStore.getLocal(broadcast2BlockId2) should be (None) - executorStore.getLocal(broadcast2BlockId) should be (None) - executorStore.getLocal(broadcast2BlockId2) should be (None) + assert(!driverStore.hasLocalBlock(broadcast2BlockId)) + assert(!driverStore.hasLocalBlock(broadcast2BlockId2)) + assert(!executorStore.hasLocalBlock(broadcast2BlockId)) + assert(!executorStore.hasLocalBlock(broadcast2BlockId2)) } executorStore.stop() driverStore.stop() @@ -363,9 +366,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(2000) val a1 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) - assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") assert(master.getLocations("a1").size > 0, "master was not told about a1") master.removeExecutor(store.blockManagerId.executorId) @@ -381,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.getLocations("a1").size > 0, "master was not told about a1") master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY) store.waitForAsyncReregister() assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") @@ -404,12 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE master.removeExecutor(store.blockManagerId.executorId) val t1 = new Thread { override def run() { - store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } } val t2 = new Thread { override def run() { - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) } } val t3 = new Thread { @@ -425,8 +429,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE t2.join() t3.join() - store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) - store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) store.waitForAsyncReregister() } } @@ -437,9 +441,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500)) val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray) val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray) - store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) val list1Get = store.get("list1") assert(list1Get.isDefined, "list1 expected to be in store") assert(list1Get.get.data.size === 2) @@ -479,8 +486,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store2 = makeBlockManager(8000, "executor2") store3 = makeBlockManager(8000, "executor3") val list1 = List(new Array[Byte](4000)) - store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store2.putIteratorAndReleaseLock( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store3.putIteratorAndReleaseLock( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") store2.stop() store2 = null @@ -506,18 +515,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) - store.putSingle("a1", a1, storageLevel) - store.putSingle("a2", a2, storageLevel) - store.putSingle("a3", a3, storageLevel) - assert(store.getSingle("a2").isDefined, "a2 was not in store") - assert(store.getSingle("a3").isDefined, "a3 was not in store") - assert(store.getSingle("a1") === None, "a1 was in store") - assert(store.getSingle("a2").isDefined, "a2 was not in store") + store.putSingleAndReleaseLock("a1", a1, storageLevel) + store.putSingleAndReleaseLock("a2", a2, storageLevel) + store.putSingleAndReleaseLock("a3", a3, storageLevel) + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") + assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 - store.putSingle("a1", a1, storageLevel) - assert(store.getSingle("a1").isDefined, "a1 was not in store") - assert(store.getSingle("a2").isDefined, "a2 was not in store") - assert(store.getSingle("a3") === None, "a3 was in store") + store.putSingleAndReleaseLock("a1", a1, storageLevel) + assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store") } test("in-memory LRU for partitions of same RDD") { @@ -525,34 +534,34 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) - store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 // from the same RDD - assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") - assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") - assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store") + assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") + assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was not in store") // Check that rdd_0_3 doesn't replace them even after further accesses - assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") - assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") - assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") } test("in-memory LRU for partitions of multiple RDDs") { store = makeBlockManager(12000) - store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // At this point rdd_1_1 should've replaced rdd_0_1 assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") // Do a get() on rdd_0_2 so that it is the most recently used item - assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") + assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") // Put in more partitions from RDD 0; they should replace rdd_1_1 - store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped // when we try to add rdd_0_4. assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store") @@ -567,28 +576,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingle("a1", a1, StorageLevel.DISK_ONLY) - store.putSingle("a2", a2, StorageLevel.DISK_ONLY) - store.putSingle("a3", a3, StorageLevel.DISK_ONLY) - assert(store.getSingle("a2").isDefined, "a2 was in store") - assert(store.getSingle("a3").isDefined, "a3 was in store") - assert(store.getSingle("a1").isDefined, "a1 was in store") + store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY) + store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY) + store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY) + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store") + assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store") + assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store") } test("disk and memory storage") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock) } test("disk and memory storage with getLocalBytes") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock) } test("disk and memory storage with serialization") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock) } test("disk and memory storage with serialization and getLocalBytes") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock) } def testDiskAndMemoryStorage( @@ -598,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) - store.putSingle("a1", a1, storageLevel) - store.putSingle("a2", a2, storageLevel) - store.putSingle("a3", a3, storageLevel) + store.putSingleAndReleaseLock("a1", a1, storageLevel) + store.putSingleAndReleaseLock("a2", a2, storageLevel) + store.putSingleAndReleaseLock("a3", a3, storageLevel) assert(accessMethod(store)("a2").isDefined, "a2 was not in store") assert(accessMethod(store)("a3").isDefined, "a3 was not in store") assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store") @@ -615,19 +624,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) val a4 = new Array[Byte](4000) // First store a1 and a2, both in memory, and a3, on disk only - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) - store.putSingle("a3", a3, StorageLevel.DISK_ONLY) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY) // At this point LRU should not kick in because a3 is only on disk - assert(store.getSingle("a1").isDefined, "a1 was not in store") - assert(store.getSingle("a2").isDefined, "a2 was not in store") - assert(store.getSingle("a3").isDefined, "a3 was not in store") + assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") // Now let's add in a4, which uses both disk and memory; a1 should drop out - store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) - assert(store.getSingle("a1") == None, "a1 was in store") - assert(store.getSingle("a2").isDefined, "a2 was not in store") - assert(store.getSingle("a3").isDefined, "a3 was not in store") - assert(store.getSingle("a4").isDefined, "a4 was not in store") + store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) + assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store") + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") + assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in store") } test("in-memory LRU with streams") { @@ -635,23 +644,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) - store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(store.get("list2").isDefined, "list2 was not in store") + store.putIteratorAndReleaseLock( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) - assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.data.size === 2) - assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.getAndReleaseLock("list1") === None, "list1 was in store") + assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(store.get("list1").isDefined, "list1 was not in store") + store.putIteratorAndReleaseLock( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") assert(store.get("list1").get.data.size === 2) - assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) - assert(store.get("list3") === None, "list1 was in store") + assert(store.getAndReleaseLock("list3") === None, "list1 was in store") } test("LRU with mixed storage levels and streams") { @@ -661,33 +674,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) val list4 = List(new Array[Byte](2000), new Array[Byte](2000)) // First store list1 and list2, both in memory, and list3, on disk only - store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) - store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) - store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.putIteratorAndReleaseLock( + "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) val listForSizeEstimate = new ArrayBuffer[Any] listForSizeEstimate ++= list1.iterator val listSize = SizeEstimator.estimate(listForSizeEstimate) // At this point LRU should not kick in because list3 is only on disk - assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") assert(store.get("list1").get.data.size === 2) - assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) - assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.data.size === 2) - assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") assert(store.get("list1").get.data.size === 2) - assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) - assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) - assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2").isDefined, "list2 was not in store") + store.putIteratorAndReleaseLock( + "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) + assert(store.getAndReleaseLock("list1") === None, "list1 was in store") + assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) - assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.data.size === 2) - assert(store.get("list4").isDefined, "list4 was not in store") + assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in store") assert(store.get("list4").get.data.size === 2) } @@ -705,18 +722,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("overly large block") { store = makeBlockManager(5000) - store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) - assert(store.getSingle("a1") === None, "a1 was in store") - store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) + store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) + assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") + store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") - assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") } test("block compression") { try { conf.set("spark.shuffle.compress", "true") store = makeBlockManager(20000, "exec1") - store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock( + ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") store.stop() @@ -724,7 +742,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.shuffle.compress", "false") store = makeBlockManager(20000, "exec2") - store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock( + ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000, "shuffle_0_0_0 was compressed") store.stop() @@ -732,7 +751,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.broadcast.compress", "true") store = makeBlockManager(20000, "exec3") - store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock( + BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000, "broadcast_0 was not compressed") store.stop() @@ -740,28 +760,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.broadcast.compress", "false") store = makeBlockManager(20000, "exec4") - store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock( + BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed") store.stop() store = null conf.set("spark.rdd.compress", "true") store = makeBlockManager(20000, "exec5") - store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed") store.stop() store = null conf.set("spark.rdd.compress", "false") store = makeBlockManager(20000, "exec6") - store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed store = makeBlockManager(20000, "exec7") - store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed") store.stop() store = null @@ -789,12 +810,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE class UnserializableClass val a1 = new UnserializableClass intercept[java.io.NotSerializableException] { - store.putSingle("a1", a1, StorageLevel.DISK_ONLY) + store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY) } // Make sure get a1 doesn't hang and returns None. failAfter(1 second) { - assert(store.getSingle("a1").isEmpty, "a1 should not be in store") + assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store") } } @@ -844,6 +865,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("updated block statuses") { store = makeBlockManager(12000) + store.registerTask(0) val list = List.fill(2)(new Array[Byte](2000)) val bigList = List.fill(8)(new Array[Byte](2000)) @@ -860,7 +882,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 1 updated block (i.e. list1) val updatedBlocks1 = getUpdatedBlocks { - store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks1.size === 1) assert(updatedBlocks1.head._1 === TestBlockId("list1")) @@ -868,7 +891,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 1 updated block (i.e. list2) val updatedBlocks2 = getUpdatedBlocks { - store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) } assert(updatedBlocks2.size === 1) assert(updatedBlocks2.head._1 === TestBlockId("list2")) @@ -876,7 +900,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 2 updated blocks - list1 is kicked out of memory while list3 is added val updatedBlocks3 = getUpdatedBlocks { - store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks3.size === 2) updatedBlocks3.foreach { case (id, status) => @@ -890,7 +915,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added val updatedBlocks4 = getUpdatedBlocks { - store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks4.size === 2) updatedBlocks4.foreach { case (id, status) => @@ -905,7 +931,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // No updated blocks - list5 is too big to fit in store and nothing is kicked out val updatedBlocks5 = getUpdatedBlocks { - store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } assert(updatedBlocks5.size === 0) @@ -929,9 +956,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list = List.fill(2)(new Array[Byte](2000)) // Tell master. By LRU, only list2 and list3 remains. - store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIteratorAndReleaseLock( + "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) // getLocations and getBlockStatus should yield the same locations assert(store.master.getLocations("list1").size === 0) @@ -945,9 +975,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1) // This time don't tell master and see what happens. By LRU, only list5 and list6 remains. - store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) - store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putIteratorAndReleaseLock( + "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putIteratorAndReleaseLock( + "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.putIteratorAndReleaseLock( + "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) // getLocations should return nothing because the master is not informed // getBlockStatus without asking slaves should have the same result @@ -968,9 +1001,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list = List.fill(2)(new Array[Byte](100)) // insert some blocks - store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIteratorAndReleaseLock( + "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIteratorAndReleaseLock( + "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIteratorAndReleaseLock( + "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) // getLocations and getBlockStatus should yield the same locations assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size @@ -979,9 +1015,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE === 1) // insert some more blocks - store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.putIteratorAndReleaseLock( + "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIteratorAndReleaseLock( + "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.putIteratorAndReleaseLock( + "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) // getLocations and getBlockStatus should yield the same locations assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size @@ -991,7 +1030,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) blockIds.foreach { blockId => - store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIteratorAndReleaseLock( + blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } val matchedBlockIds = store.master.getMatchingBlockIds(_ match { case RDDBlockId(1, _) => true @@ -1002,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { store = makeBlockManager(12000) - store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Access rdd_1_0 to ensure it's not least recently used. - assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") + assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") // According to the same-RDD rule, rdd_1_0 should be replaced here. - store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // rdd_1_0 should have been replaced, even it's not least recently used. assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") @@ -1086,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE memoryStore.releasePendingUnrollMemoryForThisTask() // Unroll with not enough space. This should succeed after kicking out someBlock1. - store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) - store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) @@ -1098,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. - store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator @@ -1130,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // would not know how to drop them from memory later. memoryStore.remove("b1") memoryStore.remove("b2") - store.putIterator("b1", smallIterator, memOnly) - store.putIterator("b2", smallIterator, memOnly) + store.putIteratorAndReleaseLock("b1", smallIterator, memOnly) + store.putIteratorAndReleaseLock("b2", smallIterator, memOnly) // Unroll with not enough space. This should succeed but kick out b1 in the process. val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) @@ -1142,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.contains("b3")) assert(memoryStore.currentUnrollMemoryForThisTask === 0) memoryStore.remove("b3") - store.putIterator("b3", smallIterator, memOnly) + store.putIteratorAndReleaseLock("b3", smallIterator, memOnly) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) @@ -1169,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - store.putIterator("b1", smallIterator, memAndDisk) - store.putIterator("b2", smallIterator, memAndDisk) + store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk) + store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk) // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 @@ -1183,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!diskStore.contains("b2")) assert(!diskStore.contains("b3")) memoryStore.remove("b3") - store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY) + store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll huge block with not enough space. This should fail and drop the new block to disk @@ -1244,6 +1284,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(12000) val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") + store.blockInfoManager.lockNewBlockForWriting( + blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)) val result = memoryStore.putBytes(blockId, 13000, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) @@ -1263,4 +1305,104 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(result.size === 10000) assert(result.data === Right(bytes)) } + + test("read-locked blocks cannot be evicted from the MemoryStore") { + store = makeBlockManager(12000) + val arr = new Array[Byte](4000) + // First store a1 and a2, both in memory, and a3, on disk only + store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER) + store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER) + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + // This put should fail because both a1 and a2 should be read-locked: + store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER) + assert(store.getSingle("a3").isEmpty, "a3 was in store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + // Release both pins of block a2: + store.releaseLock("a2") + store.releaseLock("a2") + // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before + // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead: + store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER) + assert(store.getSingle("a2").isEmpty, "a2 was in store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") + } +} + +private object BlockManagerSuite { + + private implicit class BlockManagerTestUtils(store: BlockManager) { + + def putSingleAndReleaseLock( + block: BlockId, + value: Any, + storageLevel: StorageLevel, + tellMaster: Boolean): Unit = { + if (store.putSingle(block, value, storageLevel, tellMaster)) { + store.releaseLock(block) + } + } + + def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = { + if (store.putSingle(block, value, storageLevel)) { + store.releaseLock(block) + } + } + + def putIteratorAndReleaseLock( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel): Unit = { + if (store.putIterator(blockId, values, level)) { + store.releaseLock(blockId) + } + } + + def putIteratorAndReleaseLock( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + tellMaster: Boolean): Unit = { + if (store.putIterator(blockId, values, level, tellMaster)) { + store.releaseLock(blockId) + } + } + + def dropFromMemoryIfExists( + blockId: BlockId, + data: () => Either[Array[Any], ByteBuffer]): Unit = { + store.blockInfoManager.lockForWriting(blockId).foreach { info => + val newEffectiveStorageLevel = store.dropFromMemory(blockId, data) + if (newEffectiveStorageLevel.isValid) { + // The block is still present in at least one store, so release the lock + // but don't delete the block info + store.releaseLock(blockId) + } else { + // The block isn't present in any store, so delete the block info so that the + // block can be stored again + store.blockInfoManager.removeBlock(blockId) + } + } + } + + private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = (blockId: BlockId) => { + val result = f(blockId) + if (result.isDefined) { + store.releaseLock(blockId) + } + result + } + + def hasLocalBlock(blockId: BlockId): Boolean = { + getLocalAndReleaseLock(blockId).isDefined + } + + val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal) + val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get) + val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle) + val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes) + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java index f55b884..631d767 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java @@ -28,7 +28,7 @@ import io.netty.buffer.Unpooled; /** * A {@link ManagedBuffer} backed by {@link ByteBuffer}. */ -public final class NioManagedBuffer extends ManagedBuffer { +public class NioManagedBuffer extends ManagedBuffer { private final ByteBuffer buf; public NioManagedBuffer(ByteBuffer buf) { http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 83d7953..efa2eea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -46,7 +46,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } def isMaterialized(rddId: Int): Boolean = { - sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty + val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)) + maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0))) + maybeBlock.nonEmpty } test("withColumn doesn't invalidate cached dataframe") { http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 11863ca..86f02e6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -40,7 +40,9 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { } def isMaterialized(rddId: Int): Boolean = { - sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty + val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)) + maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0))) + maybeBlock.nonEmpty } test("cache table") { http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index e22e320..3d9c085 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -91,6 +91,8 @@ private[streaming] class BlockManagerBasedBlockHandler( if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") + } else { + blockManager.releaseLock(blockId) } BlockManagerBasedStoreResult(blockId, numRecords) } @@ -189,6 +191,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") + } else { + blockManager.releaseLock(blockId) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org