Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/791#discussion_r12840665
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -166,45 +166,51 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
           size: Long,
           deserialized: Boolean): ResultWithDroppedBlocks = {
     
    -    /* TODO: Its possible to optimize the locking by locking entries only 
when selecting blocks
    -     * to be dropped. Once the to-be-dropped blocks have been selected, 
and lock on entries has
    -     * been released, it must be ensured that those to-be-dropped blocks 
are not double counted
    -     * for freeing up more space for another block that needs to be put. 
Only then the actually
    -     * dropping of blocks (and writing to disk if necessary) can proceed 
in parallel. */
    -
         var putSuccess = false
         val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
     
    -    putLock.synchronized {
    -      val freeSpaceResult = ensureFreeSpace(blockId, size)
    -      val enoughFreeSpace = freeSpaceResult.success
    -      droppedBlocks ++= freeSpaceResult.droppedBlocks
     
    -      if (enoughFreeSpace) {
    -        val entry = new Entry(value, size, deserialized)
    -        entries.synchronized {
    -          entries.put(blockId, entry)
    -          currentMemory += size
    +    val toBeDroppedBlocks = putLock.synchronized { 
ensureFreeSpace(blockId, size) }
    +    if (toBeDroppedBlocks.isDefined) {
    +      try {
    +        toBeDroppedBlocks.get.foreach { block =>
    +          val droppedBlockStatus = blockManager.dropFromMemory(block.id, 
block.data)
    +          droppedBlockStatus.map { status => droppedBlocks += blockId -> 
status }
             }
    -        if (deserialized) {
    -          logInfo("Block %s stored as values to memory (estimated size %s, 
free %s)".format(
    -            blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
    -        } else {
    -          logInfo("Block %s stored as bytes to memory (size %s, free 
%s)".format(
    -            blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
    +      } catch {
    +        // if task hit exception or cancelled by executor, then reset 
dropping flags of selected blocks
    +        case e: Exception => {
    +          toBeDroppedBlocks.get.foreach { block =>
    +            val entry = entries.synchronized{ entries.get(block.id) }
    +            if (entry != null) entry.dropping = false
    --- End diff --
    
    You are modifying entry.dropping here - there is no gaurantee this change 
will be visible to other threads anytime soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to