Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140651608 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 10000) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { + // Setup a memory store with many blocks cached, and then one request which leads to multiple + // blocks getting evicted. We'll make the eviction throw an exception, and make sure that + // all locks are released. + val ct = implicitly[ClassTag[Array[Byte]]] + def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { + var memoryStore: MemoryStore = _ + + override private[storage] def dropFromMemory[T: ClassTag]( + blockId: BlockId, + data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { + droppedSoFar += 1 + memoryStore.remove(blockId) + if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY + } else { + StorageLevel.NONE + } + } else { + throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } + } + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { + override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { + // pretend that we get a read lock on the block (now on disk) in another thread + TaskContext.setTaskContext(tc) + blockInfoManager.lockForReading(blockId) + TaskContext.unset() + } + } + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => --- End diff -- To piggy back on @vanzin's comment, sizePerBlock also please (so that 100 goes away) ? Thx
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org