[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19311


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140815961
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
--- End diff --

I don't think `validBlock` captures the intent here -- I don't see anything 
valid or invalid about it either way.  The part of the behavior which changes 
is whether or not another thread grabs a reader lock on the thread after it 
gets dropped to disk.

(To go along with that, we drop the block to disk, rather than just 
evicting it completely, as otherwise there is nothing to grab a lock of.  I 
could always drop the block to disk, instead of having that depend on this, it 
just seemed like another useful thing to check, whether the number of blocks 
was successfully updated in `blockInfoManager`, when the block was dropped 
completely.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140815652
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
+  val tc = TaskContext.empty()
+  val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
+  val blockInfoManager = new BlockInfoManager
+  blockInfoManager.registerTask(tc.taskAttemptId)
+  var droppedSoFar = 0
+  val blockEvictionHandler = new BlockEvictionHandler {
+var memoryStore: MemoryStore = _
+
+override private[storage] def dropFromMemory[T: ClassTag](
+blockId: BlockId,
+data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
+  if (droppedSoFar < failAfterDroppingNBlocks) {
+droppedSoFar += 1
+memoryStore.remove(blockId)
+if (readLockAfterDrop) {
+  // for testing purposes, we act like another thread gets the 
read lock on the new
+  // block
+  StorageLevel.DISK_ONLY
+} else {
+  StorageLevel.NONE
+}
+  } else {
+throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
+  }
+}
+  }
+  val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
+  blockEvictionHandler) {
+override def afterDropAction(blockId: BlockId): Unit = {
+  if (readLockAfterDrop) {
+// pretend that we get a read lock on the block (now on disk) 
in another thread
+TaskContext.setTaskContext(tc)
+blockInfoManager.lockForReading(blockId)
+TaskContext.unset()
+  }
+}
+  }
+
+  blockEvictionHandler.memoryStore = memoryStore
+  memManager.setMemoryStore(memoryStore)
+
+  // Put in some small blocks to fill up the memory store
+  val initialBlocks = (1 to 10).map { id =>
+val blockId = BlockId(s"rdd_1_$id")
+val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
+assert(initialWriteLock)
+val success = memoryStore.putBytes(blockId, 10, 
MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(10))
+})
+assert(success)
+blockInfoManager.unlock(blockId, None)
+  }
+  assert(blockInfoManager.size === 10)
+
+
+  // Add one big block, which will require evicting everything in the 
memorystore.  However our
+  // mock BlockEvictionHandler will throw an exception -- make sure 
all locks are cleared.
+  val largeBlockId = BlockId(s"rdd_2_1")
+  val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+  val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
+  assert(initialWriteLock)
+  if (failAfterDroppingNBlocks < 10) {
+val exc = intercept[RuntimeException] {
+  memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () 
=> {
+new ChunkedByteBuffer(ByteBuffer.allocate(100))
+  })
+}
+assert(exc.getMessage().startsWith("Mock error dropping block"), 
exc)
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.removeBlock(largeBlockId)
+  } else {
+memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(100))
+})
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.unlock(largeBlockId)
+  }
+
+  val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0
+  val expBlocks = 10 +
+(if (re

[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140813788
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
+  val tc = TaskContext.empty()
+  val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
+  val blockInfoManager = new BlockInfoManager
+  blockInfoManager.registerTask(tc.taskAttemptId)
+  var droppedSoFar = 0
+  val blockEvictionHandler = new BlockEvictionHandler {
+var memoryStore: MemoryStore = _
+
+override private[storage] def dropFromMemory[T: ClassTag](
+blockId: BlockId,
+data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
+  if (droppedSoFar < failAfterDroppingNBlocks) {
+droppedSoFar += 1
+memoryStore.remove(blockId)
+if (readLockAfterDrop) {
+  // for testing purposes, we act like another thread gets the 
read lock on the new
+  // block
+  StorageLevel.DISK_ONLY
+} else {
+  StorageLevel.NONE
+}
+  } else {
+throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
+  }
+}
+  }
+  val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
+  blockEvictionHandler) {
+override def afterDropAction(blockId: BlockId): Unit = {
+  if (readLockAfterDrop) {
+// pretend that we get a read lock on the block (now on disk) 
in another thread
+TaskContext.setTaskContext(tc)
+blockInfoManager.lockForReading(blockId)
+TaskContext.unset()
+  }
+}
+  }
+
+  blockEvictionHandler.memoryStore = memoryStore
+  memManager.setMemoryStore(memoryStore)
+
+  // Put in some small blocks to fill up the memory store
+  val initialBlocks = (1 to 10).map { id =>
+val blockId = BlockId(s"rdd_1_$id")
+val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
+assert(initialWriteLock)
+val success = memoryStore.putBytes(blockId, 10, 
MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(10))
+})
+assert(success)
+blockInfoManager.unlock(blockId, None)
+  }
+  assert(blockInfoManager.size === 10)
+
+
+  // Add one big block, which will require evicting everything in the 
memorystore.  However our
+  // mock BlockEvictionHandler will throw an exception -- make sure 
all locks are cleared.
+  val largeBlockId = BlockId(s"rdd_2_1")
+  val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+  val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
+  assert(initialWriteLock)
+  if (failAfterDroppingNBlocks < 10) {
+val exc = intercept[RuntimeException] {
+  memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () 
=> {
+new ChunkedByteBuffer(ByteBuffer.allocate(100))
+  })
+}
+assert(exc.getMessage().startsWith("Mock error dropping block"), 
exc)
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.removeBlock(largeBlockId)
+  } else {
+memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(100))
+})
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.unlock(largeBlockId)
+  }
+
+  val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0
+  val expBlocks = 10 +
+(if (rea

[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140651741
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
+  val tc = TaskContext.empty()
+  val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
+  val blockInfoManager = new BlockInfoManager
+  blockInfoManager.registerTask(tc.taskAttemptId)
+  var droppedSoFar = 0
+  val blockEvictionHandler = new BlockEvictionHandler {
+var memoryStore: MemoryStore = _
+
+override private[storage] def dropFromMemory[T: ClassTag](
+blockId: BlockId,
+data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
+  if (droppedSoFar < failAfterDroppingNBlocks) {
+droppedSoFar += 1
+memoryStore.remove(blockId)
+if (readLockAfterDrop) {
+  // for testing purposes, we act like another thread gets the 
read lock on the new
+  // block
+  StorageLevel.DISK_ONLY
+} else {
+  StorageLevel.NONE
+}
+  } else {
+throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
+  }
+}
+  }
+  val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
+  blockEvictionHandler) {
+override def afterDropAction(blockId: BlockId): Unit = {
+  if (readLockAfterDrop) {
+// pretend that we get a read lock on the block (now on disk) 
in another thread
+TaskContext.setTaskContext(tc)
+blockInfoManager.lockForReading(blockId)
+TaskContext.unset()
+  }
+}
+  }
+
+  blockEvictionHandler.memoryStore = memoryStore
+  memManager.setMemoryStore(memoryStore)
+
+  // Put in some small blocks to fill up the memory store
+  val initialBlocks = (1 to 10).map { id =>
+val blockId = BlockId(s"rdd_1_$id")
+val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
+assert(initialWriteLock)
+val success = memoryStore.putBytes(blockId, 10, 
MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(10))
+})
+assert(success)
+blockInfoManager.unlock(blockId, None)
+  }
+  assert(blockInfoManager.size === 10)
+
+
+  // Add one big block, which will require evicting everything in the 
memorystore.  However our
+  // mock BlockEvictionHandler will throw an exception -- make sure 
all locks are cleared.
+  val largeBlockId = BlockId(s"rdd_2_1")
+  val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+  val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
+  assert(initialWriteLock)
+  if (failAfterDroppingNBlocks < 10) {
+val exc = intercept[RuntimeException] {
+  memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () 
=> {
+new ChunkedByteBuffer(ByteBuffer.allocate(100))
+  })
+}
+assert(exc.getMessage().startsWith("Mock error dropping block"), 
exc)
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.removeBlock(largeBlockId)
+  } else {
+memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(100))
+})
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.unlock(largeBlockId)
+  }
+
+  val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0
+  val expBlocks = 10 +
+(if (re

[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140651608
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
+  val tc = TaskContext.empty()
+  val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
+  val blockInfoManager = new BlockInfoManager
+  blockInfoManager.registerTask(tc.taskAttemptId)
+  var droppedSoFar = 0
+  val blockEvictionHandler = new BlockEvictionHandler {
+var memoryStore: MemoryStore = _
+
+override private[storage] def dropFromMemory[T: ClassTag](
+blockId: BlockId,
+data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
+  if (droppedSoFar < failAfterDroppingNBlocks) {
+droppedSoFar += 1
+memoryStore.remove(blockId)
+if (readLockAfterDrop) {
+  // for testing purposes, we act like another thread gets the 
read lock on the new
+  // block
+  StorageLevel.DISK_ONLY
+} else {
+  StorageLevel.NONE
+}
+  } else {
+throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
+  }
+}
+  }
+  val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
+  blockEvictionHandler) {
+override def afterDropAction(blockId: BlockId): Unit = {
+  if (readLockAfterDrop) {
+// pretend that we get a read lock on the block (now on disk) 
in another thread
+TaskContext.setTaskContext(tc)
+blockInfoManager.lockForReading(blockId)
+TaskContext.unset()
+  }
+}
+  }
+
+  blockEvictionHandler.memoryStore = memoryStore
+  memManager.setMemoryStore(memoryStore)
+
+  // Put in some small blocks to fill up the memory store
+  val initialBlocks = (1 to 10).map { id =>
--- End diff --

To piggy back on @vanzin's comment, sizePerBlock also please (so that 100 
goes away) ? Thx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140651513
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
--- End diff --

Nit: failAfterDroppingNBlocks -> numValidBlocks, readLockAfterDrop -> 
validBlock ?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140612177
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
+  val tc = TaskContext.empty()
+  val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
+  val blockInfoManager = new BlockInfoManager
+  blockInfoManager.registerTask(tc.taskAttemptId)
+  var droppedSoFar = 0
+  val blockEvictionHandler = new BlockEvictionHandler {
+var memoryStore: MemoryStore = _
+
+override private[storage] def dropFromMemory[T: ClassTag](
+blockId: BlockId,
+data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
+  if (droppedSoFar < failAfterDroppingNBlocks) {
+droppedSoFar += 1
+memoryStore.remove(blockId)
+if (readLockAfterDrop) {
+  // for testing purposes, we act like another thread gets the 
read lock on the new
+  // block
+  StorageLevel.DISK_ONLY
+} else {
+  StorageLevel.NONE
+}
+  } else {
+throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
+  }
+}
+  }
+  val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
+  blockEvictionHandler) {
+override def afterDropAction(blockId: BlockId): Unit = {
+  if (readLockAfterDrop) {
+// pretend that we get a read lock on the block (now on disk) 
in another thread
+TaskContext.setTaskContext(tc)
+blockInfoManager.lockForReading(blockId)
+TaskContext.unset()
+  }
+}
+  }
+
+  blockEvictionHandler.memoryStore = memoryStore
+  memManager.setMemoryStore(memoryStore)
+
+  // Put in some small blocks to fill up the memory store
+  val initialBlocks = (1 to 10).map { id =>
--- End diff --

The logic looks fine, but I kinda dislike the magic number ("10") being 
used everywhere.

A constant would make this much better (`val initialBlocks = 10`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140559549
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -544,20 +544,39 @@ private[spark] class MemoryStore(
   }
 
   if (freedMemory >= space) {
-logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
-  s"(${Utils.bytesToString(freedMemory)} bytes)")
-for (blockId <- selectedBlocks) {
-  val entry = entries.synchronized { entries.get(blockId) }
-  // This should never be null as only one task should be dropping
-  // blocks and removing entries. However the check is still here 
for
-  // future safety.
-  if (entry != null) {
-dropBlock(blockId, entry)
+var exceptionWasThrown: Boolean = true
+try {
+  logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
+s"(${Utils.bytesToString(freedMemory)} bytes)")
+  for (blockId <- selectedBlocks) {
+val entry = entries.synchronized {
+  entries.get(blockId)
+}
+// This should never be null as only one task should be 
dropping
+// blocks and removing entries. However the check is still 
here for
+// future safety.
+if (entry != null) {
+  dropBlock(blockId, entry)
+}
+  }
+  exceptionWasThrown = false
+  logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
+s"free memory is ${Utils.bytesToString(maxMemory - 
blocksMemoryUsed)}")
+  freedMemory
+} finally {
+  // like BlockManager.doPut, we use a finally rather than a catch 
to avoid having to deal
+  // with InterruptedException
+  if (exceptionWasThrown) {
+selectedBlocks.foreach { id =>
+  // some of the blocks may have already been unlocked, or 
completely removed
+  blockInfoManager.get(id).foreach { info =>
--- End diff --

good point, thanks, I've handled this now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-21 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140379012
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -544,20 +544,39 @@ private[spark] class MemoryStore(
   }
 
   if (freedMemory >= space) {
-logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
-  s"(${Utils.bytesToString(freedMemory)} bytes)")
-for (blockId <- selectedBlocks) {
-  val entry = entries.synchronized { entries.get(blockId) }
-  // This should never be null as only one task should be dropping
-  // blocks and removing entries. However the check is still here 
for
-  // future safety.
-  if (entry != null) {
-dropBlock(blockId, entry)
+var exceptionWasThrown: Boolean = true
+try {
+  logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
+s"(${Utils.bytesToString(freedMemory)} bytes)")
+  for (blockId <- selectedBlocks) {
+val entry = entries.synchronized {
+  entries.get(blockId)
+}
+// This should never be null as only one task should be 
dropping
+// blocks and removing entries. However the check is still 
here for
+// future safety.
+if (entry != null) {
+  dropBlock(blockId, entry)
+}
+  }
+  exceptionWasThrown = false
+  logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
+s"free memory is ${Utils.bytesToString(maxMemory - 
blocksMemoryUsed)}")
+  freedMemory
+} finally {
+  // like BlockManager.doPut, we use a finally rather than a catch 
to avoid having to deal
+  // with InterruptedException
+  if (exceptionWasThrown) {
+selectedBlocks.foreach { id =>
+  // some of the blocks may have already been unlocked, or 
completely removed
+  blockInfoManager.get(id).foreach { info =>
--- End diff --

This feels racy. Let's say you're dropping 10 blocks here.

You try to drop the first one, but `newEffectiveStorageLevel.isValid` is 
true, so you just unlock the block. Then you get to this code some time later, 
but some other thread has locked that first block. Aren't you going to drop 
that lock which you don't really own?

I think you'd need to keep track of which blocks have successfully been 
processed by `dropBlock` instead of doing this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-21 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/19311

[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace

## What changes were proposed in this pull request?

MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front.  If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock.  Otherwise the lock is never released and an executor
trying to get the lock will wait forever.

## How was this patch tested?

Added unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-22083

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19311


commit 1ff270a1fdbd567965c6c721f0a92bc1b77bc240
Author: Imran Rashid 
Date:   2017-09-21T19:12:24Z

[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace

MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front.  If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock.  Otherwise the lock is never released and an executor
trying to get the lock will wait forever.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org