Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56914947
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
    +   * whether there is enough free memory. If the block is successfully 
materialized, then the
    +   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the 
block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
    +   *         failure, return a handle which allows the caller to either 
finish the serialization
    +   *         by spilling to disk or to deserialize the 
partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully 
consume this result
    +   *         iterator or call `discard()` on it in order to free the 
storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling 
this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / 
putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, 
redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - 
unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our 
threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    This is actually here on purpose and deserves a comment. The goal here is 
to make sure that once we reach line 317 we are guaranteed to have enough 
memory to store the block. When we finish serializing the block and reach line 
311, it's possible that the actual memory usage has exceeded our unroll memory 
slightly, so here we do one final bumping up of the unroll memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to