[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19311 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140815961 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { --- End diff -- I don't think `validBlock` captures the intent here -- I don't see anything valid or invalid about it either way. The part of the behavior which changes is whether or not another thread grabs a reader lock on the thread after it gets dropped to disk. (To go along with that, we drop the block to disk, rather than just evicting it completely, as otherwise there is nothing to grab a lock of. I could always drop the block to disk, instead of having that depend on this, it just seemed like another useful thing to check, whether the number of blocks was successfully updated in `blockInfoManager`, when the block was dropped completely.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140815652 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { +var memoryStore: MemoryStore = _ + +override private[storage] def dropFromMemory[T: ClassTag]( +blockId: BlockId, +data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { +droppedSoFar += 1 +memoryStore.remove(blockId) +if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY +} else { + StorageLevel.NONE +} + } else { +throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } +} + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { +override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { +// pretend that we get a read lock on the block (now on disk) in another thread +TaskContext.setTaskContext(tc) +blockInfoManager.lockForReading(blockId) +TaskContext.unset() + } +} + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => +val blockId = BlockId(s"rdd_1_$id") +val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) +val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) +assert(initialWriteLock) +val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(10)) +}) +assert(success) +blockInfoManager.unlock(blockId, None) + } + assert(blockInfoManager.size === 10) + + + // Add one big block, which will require evicting everything in the memorystore. However our + // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. + val largeBlockId = BlockId(s"rdd_2_1") + val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) + assert(initialWriteLock) + if (failAfterDroppingNBlocks < 10) { +val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { +new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) +} +assert(exc.getMessage().startsWith("Mock error dropping block"), exc) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.removeBlock(largeBlockId) + } else { +memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) +}) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.unlock(largeBlockId) + } + + val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 + val expBlocks = 10 + +(if (re
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140813788 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { +var memoryStore: MemoryStore = _ + +override private[storage] def dropFromMemory[T: ClassTag]( +blockId: BlockId, +data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { +droppedSoFar += 1 +memoryStore.remove(blockId) +if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY +} else { + StorageLevel.NONE +} + } else { +throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } +} + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { +override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { +// pretend that we get a read lock on the block (now on disk) in another thread +TaskContext.setTaskContext(tc) +blockInfoManager.lockForReading(blockId) +TaskContext.unset() + } +} + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => +val blockId = BlockId(s"rdd_1_$id") +val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) +val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) +assert(initialWriteLock) +val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(10)) +}) +assert(success) +blockInfoManager.unlock(blockId, None) + } + assert(blockInfoManager.size === 10) + + + // Add one big block, which will require evicting everything in the memorystore. However our + // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. + val largeBlockId = BlockId(s"rdd_2_1") + val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) + assert(initialWriteLock) + if (failAfterDroppingNBlocks < 10) { +val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { +new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) +} +assert(exc.getMessage().startsWith("Mock error dropping block"), exc) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.removeBlock(largeBlockId) + } else { +memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) +}) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.unlock(largeBlockId) + } + + val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 + val expBlocks = 10 + +(if (rea
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140651741 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { +var memoryStore: MemoryStore = _ + +override private[storage] def dropFromMemory[T: ClassTag]( +blockId: BlockId, +data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { +droppedSoFar += 1 +memoryStore.remove(blockId) +if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY +} else { + StorageLevel.NONE +} + } else { +throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } +} + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { +override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { +// pretend that we get a read lock on the block (now on disk) in another thread +TaskContext.setTaskContext(tc) +blockInfoManager.lockForReading(blockId) +TaskContext.unset() + } +} + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => +val blockId = BlockId(s"rdd_1_$id") +val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) +val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) +assert(initialWriteLock) +val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(10)) +}) +assert(success) +blockInfoManager.unlock(blockId, None) + } + assert(blockInfoManager.size === 10) + + + // Add one big block, which will require evicting everything in the memorystore. However our + // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. + val largeBlockId = BlockId(s"rdd_2_1") + val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) + assert(initialWriteLock) + if (failAfterDroppingNBlocks < 10) { +val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { +new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) +} +assert(exc.getMessage().startsWith("Mock error dropping block"), exc) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.removeBlock(largeBlockId) + } else { +memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) +}) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.unlock(largeBlockId) + } + + val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0 + val expBlocks = 10 + +(if (re
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140651608 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { +var memoryStore: MemoryStore = _ + +override private[storage] def dropFromMemory[T: ClassTag]( +blockId: BlockId, +data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { +droppedSoFar += 1 +memoryStore.remove(blockId) +if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY +} else { + StorageLevel.NONE +} + } else { +throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } +} + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { +override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { +// pretend that we get a read lock on the block (now on disk) in another thread +TaskContext.setTaskContext(tc) +blockInfoManager.lockForReading(blockId) +TaskContext.unset() + } +} + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => --- End diff -- To piggy back on @vanzin's comment, sizePerBlock also please (so that 100 goes away) ? Thx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140651513 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { --- End diff -- Nit: failAfterDroppingNBlocks -> numValidBlocks, readLockAfterDrop -> validBlock ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140612177 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { +var memoryStore: MemoryStore = _ + +override private[storage] def dropFromMemory[T: ClassTag]( +blockId: BlockId, +data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { +droppedSoFar += 1 +memoryStore.remove(blockId) +if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY +} else { + StorageLevel.NONE +} + } else { +throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } +} + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { +override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { +// pretend that we get a read lock on the block (now on disk) in another thread +TaskContext.setTaskContext(tc) +blockInfoManager.lockForReading(blockId) +TaskContext.unset() + } +} + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => --- End diff -- The logic looks fine, but I kinda dislike the magic number ("10") being used everywhere. A constant would make this much better (`val initialBlocks = 10`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140559549 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -544,20 +544,39 @@ private[spark] class MemoryStore( } if (freedMemory >= space) { -logInfo(s"${selectedBlocks.size} blocks selected for dropping " + - s"(${Utils.bytesToString(freedMemory)} bytes)") -for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one task should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { -dropBlock(blockId, entry) +var exceptionWasThrown: Boolean = true +try { + logInfo(s"${selectedBlocks.size} blocks selected for dropping " + +s"(${Utils.bytesToString(freedMemory)} bytes)") + for (blockId <- selectedBlocks) { +val entry = entries.synchronized { + entries.get(blockId) +} +// This should never be null as only one task should be dropping +// blocks and removing entries. However the check is still here for +// future safety. +if (entry != null) { + dropBlock(blockId, entry) +} + } + exceptionWasThrown = false + logInfo(s"After dropping ${selectedBlocks.size} blocks, " + +s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") + freedMemory +} finally { + // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal + // with InterruptedException + if (exceptionWasThrown) { +selectedBlocks.foreach { id => + // some of the blocks may have already been unlocked, or completely removed + blockInfoManager.get(id).foreach { info => --- End diff -- good point, thanks, I've handled this now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140379012 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -544,20 +544,39 @@ private[spark] class MemoryStore( } if (freedMemory >= space) { -logInfo(s"${selectedBlocks.size} blocks selected for dropping " + - s"(${Utils.bytesToString(freedMemory)} bytes)") -for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one task should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { -dropBlock(blockId, entry) +var exceptionWasThrown: Boolean = true +try { + logInfo(s"${selectedBlocks.size} blocks selected for dropping " + +s"(${Utils.bytesToString(freedMemory)} bytes)") + for (blockId <- selectedBlocks) { +val entry = entries.synchronized { + entries.get(blockId) +} +// This should never be null as only one task should be dropping +// blocks and removing entries. However the check is still here for +// future safety. +if (entry != null) { + dropBlock(blockId, entry) +} + } + exceptionWasThrown = false + logInfo(s"After dropping ${selectedBlocks.size} blocks, " + +s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") + freedMemory +} finally { + // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal + // with InterruptedException + if (exceptionWasThrown) { +selectedBlocks.foreach { id => + // some of the blocks may have already been unlocked, or completely removed + blockInfoManager.get(id).foreach { info => --- End diff -- This feels racy. Let's say you're dropping 10 blocks here. You try to drop the first one, but `newEffectiveStorageLevel.isValid` is true, so you just unlock the block. Then you get to this code some time later, but some other thread has locked that first block. Aren't you going to drop that lock which you don't really own? I think you'd need to keep track of which blocks have successfully been processed by `dropBlock` instead of doing this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19311 [SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace ## What changes were proposed in this pull request? MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. ## How was this patch tested? Added unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-22083 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19311.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19311 commit 1ff270a1fdbd567965c6c721f0a92bc1b77bc240 Author: Imran Rashid Date: 2017-09-21T19:12:24Z [SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org