Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19285#discussion_r162802949
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -261,37 +263,93 @@ private[spark] class MemoryStore(
               // If this task attempt already owns more unroll memory than is 
necessary to store the
               // block, then release the extra memory that will not be used.
               val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
    -          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
    +          releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
               transferUnrollToStorage(size)
               true
             }
           }
    +
           if (enoughStorageMemory) {
             entries.synchronized {
    -          entries.put(blockId, entry)
    +          entries.put(blockId, createMemoryEntry())
             }
             logInfo("Block %s stored as values in memory (estimated size %s, 
free %s)".format(
               blockId, Utils.bytesToString(size), 
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
             Right(size)
           } else {
             assert(currentUnrollMemoryForThisTask >= 
unrollMemoryUsedByThisBlock,
               "released too much unroll memory")
    +        Left(unrollMemoryUsedByThisBlock)
    +      }
    +    } else {
    +      Left(unrollMemoryUsedByThisBlock)
    +    }
    +  }
    +
    +  /**
    +   * Attempt to put the given block in memory store as values.
    +   *
    +   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
    +   * whether there is enough free memory. If the block is successfully 
materialized, then the
    +   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the 
block.
    +   *
    +   * @return in case of success, the estimated size of the stored data. In 
case of failure, return
    +   *         an iterator containing the values of the block. The returned 
iterator will be backed
    +   *         by the combination of the partially-unrolled block and the 
remaining elements of the
    +   *         original input iterator. The caller must either fully consume 
this iterator or call
    +   *         `close()` on it in order to free the storage memory consumed 
by the partially-unrolled
    +   *         block.
    +   */
    +  private[storage] def putIteratorAsValues[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = 
{
    +
    +    // Underlying vector for unrolling the block
    +    var vector = new SizeTrackingVector[T]()(classTag)
    +    var arrayValues: Array[T] = null
    +    var preciseSize: Long = -1
    +
    +    def storeValue(value: T): Unit = {
    +      vector += value
    +    }
    +
    +    def estimateSize(precise: Boolean): Long = {
    +      if (precise) {
    +        // We only call need the precise size after all values unrolled.
    +        arrayValues = vector.toArray
    +        preciseSize = SizeEstimator.estimate(arrayValues)
    +        preciseSize
    +      } else {
    +        vector.estimateSize()
    +      }
    +    }
    +
    +    def createMemoryEntry(): MemoryEntry[T] = {
    +      // We successfully unrolled the entirety of this block
    +      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
    +    }
    +
    +    putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue,
    +      estimateSize, createMemoryEntry) match {
    +      case Right(storedSize) => Right(storedSize)
    +      case Left(unrollMemoryUsedByThisBlock) =>
    +        // We ran out of space while unrolling the values for this block
    +        val (unrolledIterator, size) = if (vector != null) {
    --- End diff --
    
    updated, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to