This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aea5f50  [SPARK-26525][SHUFFLE] Fast release 
ShuffleBlockFetcherIterator on completion of the iteration
aea5f50 is described below

commit aea5f506463c19fac97547ba7a28f9dd491e3a6a
Author: Liupengcheng <liupengch...@xiaomi.com>
AuthorDate: Fri Feb 1 13:47:14 2019 +0800

    [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on 
completion of the iteration
    
    ## What changes were proposed in this pull request?
    
    Currently, spark would not release ShuffleBlockFetcherIterator until the 
whole task finished.In some conditions, it incurs memory leak.
    
    An example is `rdd.repartition(m).coalesce(n, shuffle = false).save`, each 
`ShuffleBlockFetcherIterator` contains  some metas about 
mapStatus(`blocksByAddress`) and each resultTask will keep n(max to shuffle 
partitions) shuffleBlockFetcherIterator and the memory would never released 
until the task completion, for they are referenced by the completion callbacks 
of TaskContext. In some case, it may take huge memory and incurs OOM.
    
    Actually, We can release ShuffleBlockFetcherIterator as soon as it's 
consumed.
    This PR is to resolve this problem.
    
    ## How was this patch tested?
    
    unittest
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #23438 from liupc/Fast-release-shuffleblockfetcheriterator.
    
    Lead-authored-by: Liupengcheng <liupengch...@xiaomi.com>
    Co-authored-by: liupengcheng <liupengch...@xiaomi.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/shuffle/BlockStoreShuffleReader.scala    |  2 +-
 .../storage/ShuffleBlockFetcherIterator.scala      | 34 ++++++++++++++++++++--
 2 files changed, 32 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index daafe30..c5eefc7 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
       SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
       SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
       SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
-      readMetrics)
+      readMetrics).toCompletionIterator
 
     val serializerInstance = dep.serializer.newInstance()
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index f73c21b..3966980 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.shuffle.{FetchFailedException, 
ShuffleReadMetricsReporter}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, 
Utils}
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
 
 /**
@@ -160,6 +160,8 @@ final class ShuffleBlockFetcherIterator(
   @GuardedBy("this")
   private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]()
 
+  private[this] val onCompleteCallback = new 
ShuffleFetchCompletionListener(this)
+
   initialize()
 
   // Decrements the buffer reference count.
@@ -192,7 +194,7 @@ final class ShuffleBlockFetcherIterator(
   /**
    * Mark the iterator as zombie, and release all buffers that haven't been 
deserialized yet.
    */
-  private[this] def cleanup() {
+  private[storage] def cleanup() {
     synchronized {
       isZombie = true
     }
@@ -364,7 +366,7 @@ final class ShuffleBlockFetcherIterator(
 
   private[this] def initialize(): Unit = {
     // Add a task completion callback (called in both success case and failure 
case) to cleanup.
-    context.addTaskCompletionListener[Unit](_ => cleanup())
+    context.addTaskCompletionListener(onCompleteCallback)
 
     // Split local and remote blocks.
     val remoteRequests = splitLocalRemoteBlocks()
@@ -509,6 +511,11 @@ final class ShuffleBlockFetcherIterator(
     (currentResult.blockId, new BufferReleasingInputStream(input, this))
   }
 
+  def toCompletionIterator: Iterator[(BlockId, InputStream)] = {
+    CompletionIterator[(BlockId, InputStream), this.type](this,
+      onCompleteCallback.onComplete(context))
+  }
+
   private def fetchUpToMaxBytes(): Unit = {
     // Send fetch requests up to maxBytesInFlight. If you cannot fetch from a 
remote host
     // immediately, defer the request until the next time it can be processed.
@@ -609,6 +616,27 @@ private class BufferReleasingInputStream(
   override def reset(): Unit = delegate.reset()
 }
 
+/**
+ * A listener to be called at the completion of the ShuffleBlockFetcherIterator
+ * @param data the ShuffleBlockFetcherIterator to process
+ */
+private class ShuffleFetchCompletionListener(var data: 
ShuffleBlockFetcherIterator)
+  extends TaskCompletionListener {
+
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    if (data != null) {
+      data.cleanup()
+      // Null out the referent here to make sure we don't keep a reference to 
this
+      // ShuffleBlockFetcherIterator, after we're done reading from it, to let 
it be
+      // collected during GC. Otherwise we can hold metadata on block 
locations(blocksByAddress)
+      data = null
+    }
+  }
+
+  // Just an alias for onTaskCompletion to avoid confusing
+  def onComplete(context: TaskContext): Unit = this.onTaskCompletion(context)
+}
+
 private[storage]
 object ShuffleBlockFetcherIterator {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to