Github user liyezhang556520 commented on a diff in the pull request: https://github.com/apache/spark/pull/791#discussion_r17463714 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -230,17 +236,27 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val selectedBlocks = new ArrayBuffer[BlockId]() var selectedMemory = 0L - // This is synchronized to ensure that the set of entries is not changed - // (because of getValue or getBytes) while traversing the iterator, as that - // can lead to exceptions. - entries.synchronized { - val iterator = entries.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { - selectedBlocks += blockId - selectedMemory += pair.getValue.size + // This lock ensures that the selection and marking for the to-be-dropped blocks + // is done by only one thread at a time. Otherwise if one thread has selected some + // blocks and going to mark them as dropping, another thread may select the same blocks + // and mark them twice, which leads to incorrect calculation of free space. + selectLock.synchronized { + + // This is synchronized to ensure that the set of entries is not changed + // (because of getValue or getBytes) while traversing the iterator, as that + // can lead to exceptions. + entries.synchronized { + val iterator = entries.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { + val pair = iterator.next() + val entry = pair.getValue + if (!entry.dropping) { + val blockId = pair.getKey + if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { + selectedBlocks += blockId + selectedMemory += entry.size + } + } } } } --- End diff -- @cloud-fan @andrewor14 Hi cloud-fan, I think there will be some problem when you doesn't update the `currentMemory`. Assume there are two threads, the first one get the `selectLock` and finished running and release the lock, till now the `currentMemory` is not updated, then the second thread get the `selectLock`, the value of `currentMemory` for the second thread is the same with the first thread, so, the `freeMemory=maxMemory-currentMemory` is use for two times by the two threads. which means the `selectedMemory` for the second thread is smaller than it actually required.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org