Github user squito commented on a diff in the pull request:
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -407,4 +407,119 @@ class MemoryStoreSuite
         assert(memoryStore.getSize(blockId) === 10000)
    +  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
    +    // Setup a memory store with many blocks cached, and then one request 
which leads to multiple
    +    // blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
    +    // all locks are released.
    +    val ct = implicitly[ClassTag[Array[Byte]]]
    +    def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
    +      val tc = TaskContext.empty()
    +      val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
    +      val blockInfoManager = new BlockInfoManager
    +      blockInfoManager.registerTask(tc.taskAttemptId)
    +      var droppedSoFar = 0
    +      val blockEvictionHandler = new BlockEvictionHandler {
    +        var memoryStore: MemoryStore = _
    +        override private[storage] def dropFromMemory[T: ClassTag](
    +            blockId: BlockId,
    +            data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
    +          if (droppedSoFar < failAfterDroppingNBlocks) {
    +            droppedSoFar += 1
    +            memoryStore.remove(blockId)
    +            if (readLockAfterDrop) {
    +              // for testing purposes, we act like another thread gets the 
read lock on the new
    +              // block
    +              StorageLevel.DISK_ONLY
    +            } else {
    +              StorageLevel.NONE
    +            }
    +          } else {
    +            throw new RuntimeException(s"Mock error dropping block 
    +          }
    +        }
    +      }
    +      val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
    +          blockEvictionHandler) {
    +        override def afterDropAction(blockId: BlockId): Unit = {
    +          if (readLockAfterDrop) {
    +            // pretend that we get a read lock on the block (now on disk) 
in another thread
    +            TaskContext.setTaskContext(tc)
    +            blockInfoManager.lockForReading(blockId)
    +            TaskContext.unset()
    +          }
    +        }
    +      }
    +      blockEvictionHandler.memoryStore = memoryStore
    +      memManager.setMemoryStore(memoryStore)
    +      // Put in some small blocks to fill up the memory store
    +      val initialBlocks = (1 to 10).map { id =>
    +        val blockId = BlockId(s"rdd_1_$id")
    +        val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
    +        val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
    +        assert(initialWriteLock)
    +        val success = memoryStore.putBytes(blockId, 10, 
MemoryMode.ON_HEAP, () => {
    +          new ChunkedByteBuffer(ByteBuffer.allocate(10))
    +        })
    +        assert(success)
    +        blockInfoManager.unlock(blockId, None)
    +      }
    +      assert(blockInfoManager.size === 10)
    +      // Add one big block, which will require evicting everything in the 
memorystore.  However our
    +      // mock BlockEvictionHandler will throw an exception -- make sure 
all locks are cleared.
    +      val largeBlockId = BlockId(s"rdd_2_1")
    +      val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
    +      val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
    +      assert(initialWriteLock)
    +      if (failAfterDroppingNBlocks < 10) {
    +        val exc = intercept[RuntimeException] {
    +          memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () 
=> {
    +            new ChunkedByteBuffer(ByteBuffer.allocate(100))
    +          })
    +        }
    +        assert(exc.getMessage().startsWith("Mock error dropping block"), 
    +        // BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
    +        // testing that here, so do it manually
    +        blockInfoManager.removeBlock(largeBlockId)
    +      } else {
    +        memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
    +          new ChunkedByteBuffer(ByteBuffer.allocate(100))
    +        })
    +        // BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
    +        // testing that here, so do it manually
    +        blockInfoManager.unlock(largeBlockId)
    +      }
    +      val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0
    +      val expBlocks = 10 +
    +        (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) +
    +        largeBlockInMemory
    +      assert(blockInfoManager.size === expBlocks)
    +      val blocksStillInMemory = blockInfoManager.entries.filter { case 
(id, info) =>
    +        assert(info.writerTask === BlockInfo.NO_WRITER, id)
    +        // in this test, all the blocks in memory have no reader, but 
everything dropped to disk
    +        // had another thread read the block.  We shouldn't lose the other 
thread's reader lock.
    --- End diff --
    In an earlier version of this, I was always unconditionally releasing all 
locks that were held by anything in the `finally`.  I've changed it to only 
release locks that this thread holds, and this part of the test is to verify 
that.  We simulate another thread grabbing a lock on the blocks which get 
successfully dropped (just a read lock in this case, though doesn't really 
matter).  The test makes sure that even though we drop some of the remaining 
locks owned by this thread in the `finally`, the other thread still keeps its 
read lock.
    Yes, there are many other possible interleavings of locks possible with 
other threads, but thats not the point of this test case.  Its to make sure 
that the `finally` block releases only the correct set of locks.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to