Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r56914947 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -244,13 +244,113 @@ private[spark] class MemoryStore( } } + /** + * Attempt to put the given block in memory store as bytes. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return a handle which allows the caller to either finish the serialization + * by spilling to disk or to deserialize the partially-serialized block and reconstruct + * the original input iterator. The caller must either fully consume this result + * iterator or call `discard()` on it in order to free the storage memory consumed by the + * partially-unrolled block. + */ + private[storage] def putIteratorAsBytes( + blockId: BlockId, + values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = { + + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + + // Whether there is still enough memory for us to continue unrolling this block + var keepUnrolling = true + // Initial per-task memory to request for unrolling blocks (bytes). + val initialMemoryThreshold = unrollMemoryThreshold + // Keep track of unroll memory used by this particular block / putIterator() operation + var unrollMemoryUsedByThisBlock = 0L + // Underlying buffer for unrolling the block + val redirectableStream = new RedirectableOutputStream + val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) + redirectableStream.setOutputStream(byteArrayChunkOutputStream) + val serializationStream: SerializationStream = { + val ser = blockManager.defaultSerializer.newInstance() + ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream)) + } + + // Request enough memory to begin unrolling + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + + if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } else { + unrollMemoryUsedByThisBlock += initialMemoryThreshold + } + + def reserveAdditionalMemoryIfNecessary(): Unit = { + if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { + val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest + } + unrollMemoryUsedByThisBlock += amountToRequest + } + } + + // Unroll this block safely, checking whether we have exceeded our threshold + while (values.hasNext && keepUnrolling) { + serializationStream.writeObject(values.next()) + reserveAdditionalMemoryIfNecessary() + } + + if (keepUnrolling) { --- End diff -- This is actually here on purpose and deserves a comment. The goal here is to make sure that once we reach line 317 we are guaranteed to have enough memory to store the block. When we finish serializing the block and reach line 311, it's possible that the actual memory usage has exceeded our unroll memory slightly, so here we do one final bumping up of the unroll memory.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org