Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/791#discussion_r12937647
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -166,69 +169,74 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
           size: Long,
           deserialized: Boolean): ResultWithDroppedBlocks = {
     
    -    /* TODO: Its possible to optimize the locking by locking entries only 
when selecting blocks
    -     * to be dropped. Once the to-be-dropped blocks have been selected, 
and lock on entries has
    -     * been released, it must be ensured that those to-be-dropped blocks 
are not double counted
    -     * for freeing up more space for another block that needs to be put. 
Only then the actually
    -     * dropping of blocks (and writing to disk if necessary) can proceed 
in parallel. */
    -
         var putSuccess = false
         val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    -
    -    putLock.synchronized {
    -      val freeSpaceResult = ensureFreeSpace(blockId, size)
    -      val enoughFreeSpace = freeSpaceResult.success
    -      droppedBlocks ++= freeSpaceResult.droppedBlocks
    -
    -      if (enoughFreeSpace) {
    -        val entry = new Entry(value, size, deserialized)
    -        entries.synchronized {
    -          entries.put(blockId, entry)
    -          currentMemory += size
    +    val result = getToBeDroppedBlocks(blockId, size)
    +    if (result.isDefined) {
    +      val toBeDroppedBlocks = result.get
    +      var droppingDoneCount = 0
    +      try {
    +        toBeDroppedBlocks.foreach { block =>
    +          val droppedBlockStatus = blockManager.dropFromMemory(block.id, 
block.data)
    +          droppedBlockStatus.map { status => droppedBlocks += blockId -> 
status }
    +          droppingDoneCount += 1
             }
    -        if (deserialized) {
    -          logInfo("Block %s stored as values to memory (estimated size %s, 
free %s)".format(
    -            blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
    -        } else {
    -          logInfo("Block %s stored as bytes to memory (size %s, free 
%s)".format(
    -            blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
    +      } finally {
    +        if (droppingDoneCount < toBeDroppedBlocks.size) {
    +          toBeDroppedBlocks.drop(droppingDoneCount).foreach { block =>
    +            entries.synchronized{
    +              val entry = entries.get(block.id)
    +              if (entry != null) entry.dropping = false
    +            }
    +          }
             }
    -        putSuccess = true
    +      }
    +
    +      val entry = new Entry(value, size, deserialized)
    +      entries.synchronized {
    +        entries.put(blockId, entry)
    +        currentMemory += size
    +      }
    +      if (deserialized) {
    +        logInfo("Block %s stored as values to memory (estimated size %s, 
free %s)".format(
    +          blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
           } else {
    -        // Tell the block manager that we couldn't put it in memory so 
that it can drop it to
    -        // disk if the block allows disk storage.
    -        val data = if (deserialized) {
    -          Left(value.asInstanceOf[ArrayBuffer[Any]])
    -        } else {
    -          Right(value.asInstanceOf[ByteBuffer].duplicate())
    -        }
    -        val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
    -        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, 
status)) }
    +        logInfo("Block %s stored as bytes to memory (size %s, free 
%s)".format(
    +          blockId, Utils.bytesToString(size), 
Utils.bytesToString(freeMemory)))
           }
    +      putSuccess = true
    +    } else {
    +      // Tell the block manager that we couldn't put it in memory so that 
it can drop it to
    +      // disk if the block allows disk storage.
    +      val data = if (deserialized) {
    +        Left(value.asInstanceOf[ArrayBuffer[Any]])
    +      } else {
    +        Right(value.asInstanceOf[ByteBuffer].duplicate())
    +      }
    +      val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
    +      droppedBlockStatus.map { status => droppedBlocks += blockId -> 
status }
         }
         ResultWithDroppedBlocks(putSuccess, droppedBlocks)
       }
     
       /**
    -   * Try to free up a given amount of space to store a particular block, 
but can fail if
    -   * either the block is bigger than our memory or it would require 
replacing another block
    -   * from the same RDD (which leads to a wasteful cyclic replacement 
pattern for RDDs that
    -   * don't fit into memory that we want to avoid).
    -   *
    -   * Assume that a lock is held by the caller to ensure only one thread is 
dropping blocks.
    -   * Otherwise, the freed space may fill up before the caller puts in 
their new value.
    +   * Try to select some blocks which will free up a given amount of space 
to store a
    +   * particular block, but can fail if either the block is bigger than our 
memory or
    +   * it would require selecting another block from the same RDD (which 
leads to a wasteful
    +   * cyclic replacement pattern for RDDs that don't fit into memory that 
we want to avoid).
        *
    -   * Return whether there is enough free space, along with the blocks 
dropped in the process.
    +   * Return None if there is no enough free space, or 
Some[List[ToBeDroppedBlock]] if space is
    +   * enough and tell the caller which blocks need to be dropped
        */
    -  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): 
ResultWithDroppedBlocks = {
    +  private def getToBeDroppedBlocks(blockIdToAdd: BlockId, space: Long): 
Option[Seq[ToBeDroppedBlock]] = {
    --- End diff --
    
    instead of 'get', can you rename it to 'find' or some such ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to