Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r149379715 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -261,37 +259,97 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") + Left(unrollMemoryUsedByThisBlock) + } + } else { + Left(unrollMemoryUsedByThisBlock) + } + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + var preciseSize: Long = -1 + + def storeValue(value: T): Unit = { + vector += value + } + + def estimateSize(precise: Boolean): Long = { + if (precise) { + // We only call need the precise size after all values unrolled. + arrayValues = vector.toArray + preciseSize = SizeEstimator.estimate(arrayValues) + vector = null + preciseSize + } else { + vector.estimateSize() + } + } + + def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + assert(arrayValues != null, "arrayValue shouldn't be null!") + assert(preciseSize != -1, "preciseSize shouldn't be -1") --- End diff -- Under which condition would `preciseSize` be `-1`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org