Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19311#discussion_r140379012
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -544,20 +544,39 @@ private[spark] class MemoryStore(
           }
     
           if (freedMemory >= space) {
    -        logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
    -          s"(${Utils.bytesToString(freedMemory)} bytes)")
    -        for (blockId <- selectedBlocks) {
    -          val entry = entries.synchronized { entries.get(blockId) }
    -          // This should never be null as only one task should be dropping
    -          // blocks and removing entries. However the check is still here 
for
    -          // future safety.
    -          if (entry != null) {
    -            dropBlock(blockId, entry)
    +        var exceptionWasThrown: Boolean = true
    +        try {
    +          logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
    +            s"(${Utils.bytesToString(freedMemory)} bytes)")
    +          for (blockId <- selectedBlocks) {
    +            val entry = entries.synchronized {
    +              entries.get(blockId)
    +            }
    +            // This should never be null as only one task should be 
dropping
    +            // blocks and removing entries. However the check is still 
here for
    +            // future safety.
    +            if (entry != null) {
    +              dropBlock(blockId, entry)
    +            }
    +          }
    +          exceptionWasThrown = false
    +          logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
    +            s"free memory is ${Utils.bytesToString(maxMemory - 
blocksMemoryUsed)}")
    +          freedMemory
    +        } finally {
    +          // like BlockManager.doPut, we use a finally rather than a catch 
to avoid having to deal
    +          // with InterruptedException
    +          if (exceptionWasThrown) {
    +            selectedBlocks.foreach { id =>
    +              // some of the blocks may have already been unlocked, or 
completely removed
    +              blockInfoManager.get(id).foreach { info =>
    --- End diff --
    
    This feels racy. Let's say you're dropping 10 blocks here.
    
    You try to drop the first one, but `newEffectiveStorageLevel.isValid` is 
true, so you just unlock the block. Then you get to this code some time later, 
but some other thread has locked that first block. Aren't you going to drop 
that lock which you don't really own?
    
    I think you'd need to keep track of which blocks have successfully been 
processed by `dropBlock` instead of doing this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to