Github user liyezhang556520 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/791#discussion_r17463714
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -230,17 +236,27 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
           val selectedBlocks = new ArrayBuffer[BlockId]()
           var selectedMemory = 0L
     
    -      // This is synchronized to ensure that the set of entries is not 
changed
    -      // (because of getValue or getBytes) while traversing the iterator, 
as that
    -      // can lead to exceptions.
    -      entries.synchronized {
    -        val iterator = entries.entrySet().iterator()
    -        while (maxMemory - (currentMemory - selectedMemory) < space && 
iterator.hasNext) {
    -          val pair = iterator.next()
    -          val blockId = pair.getKey
    -          if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
    -            selectedBlocks += blockId
    -            selectedMemory += pair.getValue.size
    +      // This lock ensures that the selection and marking for the 
to-be-dropped blocks
    +      // is done by only one thread at a time. Otherwise if one thread has 
selected some
    +      // blocks and going to mark them as dropping, another thread may 
select the same blocks
    +      // and mark them twice, which leads to incorrect calculation of free 
space.
    +      selectLock.synchronized {
    +
    +        // This is synchronized to ensure that the set of entries is not 
changed
    +        // (because of getValue or getBytes) while traversing the 
iterator, as that
    +        // can lead to exceptions.
    +        entries.synchronized {
    +          val iterator = entries.entrySet().iterator()
    +          while (maxMemory - (currentMemory - selectedMemory) < space && 
iterator.hasNext) {
    +            val pair = iterator.next()
    +            val entry = pair.getValue
    +            if (!entry.dropping) {
    +              val blockId = pair.getKey
    +              if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
    +                selectedBlocks += blockId
    +                selectedMemory += entry.size
    +              }
    +            }
               }
             }
           }
    --- End diff --
    
    @cloud-fan @andrewor14 
    Hi cloud-fan, I think there will be some problem when you doesn't update 
the `currentMemory`. Assume there are two threads, the first one get the 
`selectLock` and finished running and release the lock, till now the 
`currentMemory` is not updated, then the second thread get the `selectLock`, 
the value of `currentMemory` for the second thread is the same with the first 
thread, so, the `freeMemory=maxMemory-currentMemory` is use for two times by 
the two threads. which means the `selectedMemory` for the second thread is 
smaller than it actually required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to