Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19316#discussion_r140408246
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -387,11 +387,18 @@ private[spark] class MemoryStore(
         // the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
         // perform one final call to attempt to allocate additional memory if 
necessary.
         if (keepUnrolling) {
    -      serializationStream.close()
    -      reserveAdditionalMemoryIfNecessary()
    +      serializationStream.flush()
    +      if (bbos.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
         }
     
         if (keepUnrolling) {
    +      serializationStream.close()
    --- End diff --
    
    Here, we should close the `serializationStream` after we check it again. 
Previous code we close it first, and then request the exceed memory. So there 
is a potential problem that we can't request enought memory, while the 
`serializationStream` is closeed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to