This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aea5f50 [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on completion of the iteration aea5f50 is described below commit aea5f506463c19fac97547ba7a28f9dd491e3a6a Author: Liupengcheng <liupengch...@xiaomi.com> AuthorDate: Fri Feb 1 13:47:14 2019 +0800 [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on completion of the iteration ## What changes were proposed in this pull request? Currently, spark would not release ShuffleBlockFetcherIterator until the whole task finished.In some conditions, it incurs memory leak. An example is `rdd.repartition(m).coalesce(n, shuffle = false).save`, each `ShuffleBlockFetcherIterator` contains some metas about mapStatus(`blocksByAddress`) and each resultTask will keep n(max to shuffle partitions) shuffleBlockFetcherIterator and the memory would never released until the task completion, for they are referenced by the completion callbacks of TaskContext. In some case, it may take huge memory and incurs OOM. Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed. This PR is to resolve this problem. ## How was this patch tested? unittest Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23438 from liupc/Fast-release-shuffleblockfetcheriterator. Lead-authored-by: Liupengcheng <liupengch...@xiaomi.com> Co-authored-by: liupengcheng <liupengch...@xiaomi.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/shuffle/BlockStoreShuffleReader.scala | 2 +- .../storage/ShuffleBlockFetcherIterator.scala | 34 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index daafe30..c5eefc7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM), SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT), - readMetrics) + readMetrics).toCompletionIterator val serializerInstance = dep.serializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f73c21b..3966980 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -31,7 +31,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.util.TransportConf import org.apache.spark.shuffle.{FetchFailedException, ShuffleReadMetricsReporter} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils} import org.apache.spark.util.io.ChunkedByteBufferOutputStream /** @@ -160,6 +160,8 @@ final class ShuffleBlockFetcherIterator( @GuardedBy("this") private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]() + private[this] val onCompleteCallback = new ShuffleFetchCompletionListener(this) + initialize() // Decrements the buffer reference count. @@ -192,7 +194,7 @@ final class ShuffleBlockFetcherIterator( /** * Mark the iterator as zombie, and release all buffers that haven't been deserialized yet. */ - private[this] def cleanup() { + private[storage] def cleanup() { synchronized { isZombie = true } @@ -364,7 +366,7 @@ final class ShuffleBlockFetcherIterator( private[this] def initialize(): Unit = { // Add a task completion callback (called in both success case and failure case) to cleanup. - context.addTaskCompletionListener[Unit](_ => cleanup()) + context.addTaskCompletionListener(onCompleteCallback) // Split local and remote blocks. val remoteRequests = splitLocalRemoteBlocks() @@ -509,6 +511,11 @@ final class ShuffleBlockFetcherIterator( (currentResult.blockId, new BufferReleasingInputStream(input, this)) } + def toCompletionIterator: Iterator[(BlockId, InputStream)] = { + CompletionIterator[(BlockId, InputStream), this.type](this, + onCompleteCallback.onComplete(context)) + } + private def fetchUpToMaxBytes(): Unit = { // Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote host // immediately, defer the request until the next time it can be processed. @@ -609,6 +616,27 @@ private class BufferReleasingInputStream( override def reset(): Unit = delegate.reset() } +/** + * A listener to be called at the completion of the ShuffleBlockFetcherIterator + * @param data the ShuffleBlockFetcherIterator to process + */ +private class ShuffleFetchCompletionListener(var data: ShuffleBlockFetcherIterator) + extends TaskCompletionListener { + + override def onTaskCompletion(context: TaskContext): Unit = { + if (data != null) { + data.cleanup() + // Null out the referent here to make sure we don't keep a reference to this + // ShuffleBlockFetcherIterator, after we're done reading from it, to let it be + // collected during GC. Otherwise we can hold metadata on block locations(blocksByAddress) + data = null + } + } + + // Just an alias for onTaskCompletion to avoid confusing + def onComplete(context: TaskContext): Unit = this.onTaskCompletion(context) +} + private[storage] object ShuffleBlockFetcherIterator { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org