Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140815652 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 10000) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { + // Setup a memory store with many blocks cached, and then one request which leads to multiple + // blocks getting evicted. We'll make the eviction throw an exception, and make sure that + // all locks are released. + val ct = implicitly[ClassTag[Array[Byte]]] + def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { + var memoryStore: MemoryStore = _ + + override private[storage] def dropFromMemory[T: ClassTag]( + blockId: BlockId, + data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { + droppedSoFar += 1 + memoryStore.remove(blockId) + if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY + } else { + StorageLevel.NONE + } + } else { + throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } + } + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { + override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { + // pretend that we get a read lock on the block (now on disk) in another thread + TaskContext.setTaskContext(tc) + blockInfoManager.lockForReading(blockId) + TaskContext.unset() + } + } + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => + val blockId = BlockId(s"rdd_1_$id") + val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) + assert(initialWriteLock) + val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(10)) + }) + assert(success) + blockInfoManager.unlock(blockId, None) + } + assert(blockInfoManager.size === 10) + + + // Add one big block, which will require evicting everything in the memorystore. However our + // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. + val largeBlockId = BlockId(s"rdd_2_1") + val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) + assert(initialWriteLock) + if (failAfterDroppingNBlocks < 10) { + val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) + } + assert(exc.getMessage().startsWith("Mock error dropping block"), exc) + // BlockManager.doPut takes care of releasing the lock for the newly written block -- not + // testing that here, so do it manually + blockInfoManager.removeBlock(largeBlockId) + } else { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) + // BlockManager.doPut takes care of releasing the lock for the newly written block -- not + // testing that here, so do it manually + blockInfoManager.unlock(largeBlockId) + } + + val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 + val expBlocks = 10 + + (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) + + largeBlockInMemory + assert(blockInfoManager.size === expBlocks) + + val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) => + assert(info.writerTask === BlockInfo.NO_WRITER, id) + // in this test, all the blocks in memory have no reader, but everything dropped to disk + // had another thread read the block. We shouldn't lose the other thread's reader lock. --- End diff -- Thanks for clarifying @squito ... I was assuming the test was for something along those line, but good to know I did not misunderstand ! This patch looks great.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org