Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19285#discussion_r163768689
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -232,78 +236,93 @@ private[spark] class MemoryStore(
           elementsUnrolled += 1
         }
     
    +    val valuesBuilder = if (keepUnrolling) {
    +      Some(valuesHolder.getBuilder())
    +    } else {
    +      None
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this 
point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
    +    // perform one final call to attempt to allocate additional memory if 
necessary.
         if (keepUnrolling) {
    -      // We successfully unrolled the entirety of this block
    -      val arrayValues = vector.toArray
    -      vector = null
    -      val entry =
    -        new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
    -      val size = entry.size
    -      def transferUnrollToStorage(amount: Long): Unit = {
    -        // Synchronize so that transfer is atomic
    -        memoryManager.synchronized {
    -          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
    -          val success = memoryManager.acquireStorageMemory(blockId, 
amount, MemoryMode.ON_HEAP)
    -          assert(success, "transferring unroll memory to storage memory 
failed")
    +      val size = valuesBuilder.get.preciseSize
    +      if (size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
             }
           }
    -      // Acquire storage memory if necessary to store this block in memory.
    -      val enoughStorageMemory = {
    -        if (unrollMemoryUsedByThisBlock <= size) {
    -          val acquiredExtra =
    -            memoryManager.acquireStorageMemory(
    -              blockId, size - unrollMemoryUsedByThisBlock, 
MemoryMode.ON_HEAP)
    -          if (acquiredExtra) {
    -            transferUnrollToStorage(unrollMemoryUsedByThisBlock)
    -          }
    -          acquiredExtra
    -        } else { // unrollMemoryUsedByThisBlock > size
    -          // If this task attempt already owns more unroll memory than is 
necessary to store the
    -          // block, then release the extra memory that will not be used.
    -          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
    -          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
    -          transferUnrollToStorage(size)
    -          true
    -        }
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    updated



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to