Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19311#discussion_r140815652
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -407,4 +407,119 @@ class MemoryStoreSuite
         })
         assert(memoryStore.getSize(blockId) === 10000)
       }
    +
    +  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
    +    // Setup a memory store with many blocks cached, and then one request 
which leads to multiple
    +    // blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
    +    // all locks are released.
    +    val ct = implicitly[ClassTag[Array[Byte]]]
    +    def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
    +      val tc = TaskContext.empty()
    +      val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
    +      val blockInfoManager = new BlockInfoManager
    +      blockInfoManager.registerTask(tc.taskAttemptId)
    +      var droppedSoFar = 0
    +      val blockEvictionHandler = new BlockEvictionHandler {
    +        var memoryStore: MemoryStore = _
    +
    +        override private[storage] def dropFromMemory[T: ClassTag](
    +            blockId: BlockId,
    +            data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
    +          if (droppedSoFar < failAfterDroppingNBlocks) {
    +            droppedSoFar += 1
    +            memoryStore.remove(blockId)
    +            if (readLockAfterDrop) {
    +              // for testing purposes, we act like another thread gets the 
read lock on the new
    +              // block
    +              StorageLevel.DISK_ONLY
    +            } else {
    +              StorageLevel.NONE
    +            }
    +          } else {
    +            throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
    +          }
    +        }
    +      }
    +      val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
    +          blockEvictionHandler) {
    +        override def afterDropAction(blockId: BlockId): Unit = {
    +          if (readLockAfterDrop) {
    +            // pretend that we get a read lock on the block (now on disk) 
in another thread
    +            TaskContext.setTaskContext(tc)
    +            blockInfoManager.lockForReading(blockId)
    +            TaskContext.unset()
    +          }
    +        }
    +      }
    +
    +      blockEvictionHandler.memoryStore = memoryStore
    +      memManager.setMemoryStore(memoryStore)
    +
    +      // Put in some small blocks to fill up the memory store
    +      val initialBlocks = (1 to 10).map { id =>
    +        val blockId = BlockId(s"rdd_1_$id")
    +        val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
    +        val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
    +        assert(initialWriteLock)
    +        val success = memoryStore.putBytes(blockId, 10, 
MemoryMode.ON_HEAP, () => {
    +          new ChunkedByteBuffer(ByteBuffer.allocate(10))
    +        })
    +        assert(success)
    +        blockInfoManager.unlock(blockId, None)
    +      }
    +      assert(blockInfoManager.size === 10)
    +
    +
    +      // Add one big block, which will require evicting everything in the 
memorystore.  However our
    +      // mock BlockEvictionHandler will throw an exception -- make sure 
all locks are cleared.
    +      val largeBlockId = BlockId(s"rdd_2_1")
    +      val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
    +      val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
    +      assert(initialWriteLock)
    +      if (failAfterDroppingNBlocks < 10) {
    +        val exc = intercept[RuntimeException] {
    +          memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () 
=> {
    +            new ChunkedByteBuffer(ByteBuffer.allocate(100))
    +          })
    +        }
    +        assert(exc.getMessage().startsWith("Mock error dropping block"), 
exc)
    +        // BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
    +        // testing that here, so do it manually
    +        blockInfoManager.removeBlock(largeBlockId)
    +      } else {
    +        memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
    +          new ChunkedByteBuffer(ByteBuffer.allocate(100))
    +        })
    +        // BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
    +        // testing that here, so do it manually
    +        blockInfoManager.unlock(largeBlockId)
    +      }
    +
    +      val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0
    +      val expBlocks = 10 +
    +        (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) +
    +        largeBlockInMemory
    +      assert(blockInfoManager.size === expBlocks)
    +
    +      val blocksStillInMemory = blockInfoManager.entries.filter { case 
(id, info) =>
    +        assert(info.writerTask === BlockInfo.NO_WRITER, id)
    +        // in this test, all the blocks in memory have no reader, but 
everything dropped to disk
    +        // had another thread read the block.  We shouldn't lose the other 
thread's reader lock.
    --- End diff --
    
    Thanks for clarifying @squito ... I was assuming the test was for something 
along those line, but good to know I did not misunderstand !
    This patch looks great.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to