Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1165#discussion_r17581688
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -141,6 +193,93 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
       }
     
       /**
    +   * Unroll the given block in memory safely.
    +   *
    +   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
    +   * unrolling the entirety of the block in memory at once. This is 
achieved by periodically
    +   * checking whether the memory restrictions for unrolling blocks are 
still satisfied,
    +   * stopping immediately if not. This check is a safeguard against the 
scenario in which
    +   * there is not enough free memory to accommodate the entirety of a 
single block.
    +   *
    +   * This method returns either an array with the contents of the entire 
block or an iterator
    +   * containing the values of the block (if the array would have exceeded 
available memory).
    +   */
    +  def unrollSafely(
    +      blockId: BlockId,
    +      values: Iterator[Any],
    +      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
    +    : Either[Array[Any], Iterator[Any]] = {
    +
    +    // Number of elements unrolled so far
    +    var elementsUnrolled = 0
    +    // Whether there is still enough memory for us to continue unrolling 
this block
    +    var keepUnrolling = true
    +    // Initial per-thread memory to request for unrolling blocks (bytes). 
Exposed for testing.
    +    val initialMemoryThreshold = 
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
    +    // How often to check whether we need to request more memory
    +    val memoryCheckPeriod = 16
    +    // Memory currently reserved by this thread for this particular 
unrolling operation
    +    var memoryThreshold = initialMemoryThreshold
    +    // Memory to request as a multiple of current vector size
    +    val memoryGrowthFactor = 1.5
    +    // Previous unroll memory held by this thread, for releasing later 
(only at the very end)
    +    val previousMemoryReserved = currentUnrollMemoryForThisThread
    +    // Underlying vector for unrolling the block
    +    var vector = new SizeTrackingVector[Any]
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = 
reserveUnrollMemoryForThisThread(initialMemoryThreshold)
    +
    +    // Unroll this block safely, checking whether we have exceeded our 
threshold periodically
    +    try {
    +      while (values.hasNext && keepUnrolling) {
    +        vector += values.next()
    +        if (elementsUnrolled % memoryCheckPeriod == 0) {
    +          // If our vector's size has exceeded the threshold, request more 
memory
    +          val currentSize = vector.estimateSize()
    +          if (currentSize >= memoryThreshold) {
    +            val amountToRequest = (currentSize * (memoryGrowthFactor - 
1)).toLong
    +            // Hold the accounting lock, in case another thread 
concurrently puts a block that
    +            // takes up the unrolling space we just ensured here
    +            accountingLock.synchronized {
    +              if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
    +                // If the first request is not granted, try again after 
ensuring free space
    +                // If there is still not enough space, give up and drop 
the partition
    +                val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
    --- End diff --
    
    Got it, thanks for the explanation!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to