Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/791#discussion_r12840665 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -166,45 +166,51 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { - /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks - * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has - * been released, it must be ensured that those to-be-dropped blocks are not double counted - * for freeing up more space for another block that needs to be put. Only then the actually - * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ - var putSuccess = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - putLock.synchronized { - val freeSpaceResult = ensureFreeSpace(blockId, size) - val enoughFreeSpace = freeSpaceResult.success - droppedBlocks ++= freeSpaceResult.droppedBlocks - if (enoughFreeSpace) { - val entry = new Entry(value, size, deserialized) - entries.synchronized { - entries.put(blockId, entry) - currentMemory += size + val toBeDroppedBlocks = putLock.synchronized { ensureFreeSpace(blockId, size) } + if (toBeDroppedBlocks.isDefined) { + try { + toBeDroppedBlocks.get.foreach { block => + val droppedBlockStatus = blockManager.dropFromMemory(block.id, block.data) + droppedBlockStatus.map { status => droppedBlocks += blockId -> status } } - if (deserialized) { - logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - } else { - logInfo("Block %s stored as bytes to memory (size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) + } catch { + // if task hit exception or cancelled by executor, then reset dropping flags of selected blocks + case e: Exception => { + toBeDroppedBlocks.get.foreach { block => + val entry = entries.synchronized{ entries.get(block.id) } + if (entry != null) entry.dropping = false --- End diff -- You are modifying entry.dropping here - there is no gaurantee this change will be visible to other threads anytime soon.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---