Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11613#discussion_r55898638
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -100,48 +92,136 @@ private[spark] class MemoryStore(
        */
       def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): 
Boolean = {
         require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
    -    // Work on a duplicate - since the original input might be used 
elsewhere.
    -    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
    -    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = 
false)
    -    if (putSuccess) {
    +    if (memoryManager.acquireStorageMemory(blockId, size)) {
    +      // We acquired enough memory for the block, so go ahead and put it
    +      // Work on a duplicate - since the original input might be used 
elsewhere.
    +      val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
           assert(bytes.limit == size)
    +      val entry = new MemoryEntry(bytes, size, deserialized = false)
    +      entries.synchronized {
    +        entries.put(blockId, entry)
    +      }
    +      logInfo("Block %s stored as bytes in memory (estimated size %s, free 
%s)".format(
    +        blockId, Utils.bytesToString(size), 
Utils.bytesToString(blocksMemoryUsed)))
    +      true
    +    } else {
    +      false
         }
    -    putSuccess
       }
     
       /**
        * Attempt to put the given block in memory store.
        *
    -   * @return the estimated size of the stored data if the put() succeeded, 
or an iterator
    -   *         in case the put() failed (the returned iterator lets callers 
fall back to the disk
    -   *         store if desired).
    +   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
    +   *         failure, return an iterator contianing the values of the 
block. The returned iterator
    +   *         will be backed by the combination of the partially-unrolled 
block and the remaining
    +   *         elements of the original input iterator. The caller must 
either fully consume this
    +   *         iterator or call `close()` on it in order to free the storage 
memory consumed by the
    +   *         partially-unrolled block.
        */
       private[storage] def putIterator(
           blockId: BlockId,
           values: Iterator[Any],
    -      level: StorageLevel): Either[Iterator[Any], Long] = {
    +      level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
    +
         require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
    -    val unrolledValues = unrollSafely(blockId, values)
    -    unrolledValues match {
    -      case Left(arrayValues) =>
    -        // Values are fully unrolled in memory, so store them as an array
    -        if (level.deserialized) {
    -          val sizeEstimate = 
SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
    -          if (tryToPut(blockId, () => arrayValues, sizeEstimate, 
deserialized = true)) {
    -            Right(sizeEstimate)
    -          } else {
    -            Left(arrayValues.toIterator)
    +
    +    // It's possible that the iterator is too large to materialize and 
store in memory. To avoid
    +    // OOM exceptions, this method will gradually unroll the iterator 
while periodically checking
    +    // whether there is enough free memory.
    +
    +    // Number of elements unrolled so far
    +    var elementsUnrolled = 0
    +    // Whether there is still enough memory for us to continue unrolling 
this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // How often to check whether we need to request more memory
    +    val memoryCheckPeriod = 16
    +    // Memory currently reserved by this task for this particular 
unrolling operation
    +    var memoryThreshold = initialMemoryThreshold
    +    // Memory to request as a multiple of current vector size
    +    val memoryGrowthFactor = 1.5
    +    // Keep track of unroll memory used by this particular block / 
putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying vector for unrolling the block
    +    var vector = new SizeTrackingVector[Any]
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our 
threshold periodically
    +    while (values.hasNext && keepUnrolling) {
    +      vector += values.next()
    +      if (elementsUnrolled % memoryCheckPeriod == 0) {
    +        // If our vector's size has exceeded the threshold, request more 
memory
    +        val currentSize = vector.estimateSize()
    +        if (currentSize >= memoryThreshold) {
    +          val amountToRequest = (currentSize * memoryGrowthFactor - 
memoryThreshold).toLong
    +          keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
    +          if (keepUnrolling) {
    +            unrollMemoryUsedByThisBlock += amountToRequest
               }
    +          // New threshold is currentSize * memoryGrowthFactor
    +          memoryThreshold += amountToRequest
    +        }
    +      }
    +      elementsUnrolled += 1
    +    }
    +
    +    if (keepUnrolling) {
    +      // We successfully unrolled the entirety of this block
    +      val arrayValues = vector.toArray
    +      vector = null
    +      val entry = if (level.deserialized) {
    +        val sizeEstimate = 
SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
    +        new MemoryEntry(arrayValues, sizeEstimate, deserialized = true)
    +      } else {
    +        val bytes = blockManager.dataSerialize(blockId, 
arrayValues.iterator)
    +        new MemoryEntry(bytes, bytes.limit, deserialized = false)
    +      }
    +      val size = entry.size
    +      val acquiredStorageMemory = {
    +        if (unrollMemoryUsedByThisBlock == size) {
    +          true
    +        } else if (unrollMemoryUsedByThisBlock < size) {
    +          memoryManager.acquireStorageMemory(blockId, size - 
unrollMemoryUsedByThisBlock)
             } else {
    -          val bytes = blockManager.dataSerialize(blockId, 
arrayValues.iterator)
    -          if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = 
false)) {
    -            Right(bytes.limit())
    -          } else {
    -            Left(arrayValues.toIterator)
    -          }
    +          memoryManager.releaseUnrollMemory(unrollMemoryUsedByThisBlock - 
size)
    --- End diff --
    
    same here, it's better if we have a single place (i.e. 
`releaseUnrollMemoryUsedByThisTask`) instead of multiple places where we 
release unroll memory. Also this currently doesn't update `unrollMemoryMap`, 
which is a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to