Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235299656 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + val taskListener = new TaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = sorter.stop() + } // Use completion callback to stop sorter if task was finished/cancelled. - context.addTaskCompletionListener[Unit](_ => { - sorter.stop() - }) - CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) + context.addTaskCompletionListener(taskListener) + CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]]( + sorter.iterator, + { + sorter.stop() + // remove task completion listener as soon as the sorter stops to prevent holding + // its references till the end of the task which may lead to memory leaks, for + // example, in case of processing multiple ShuffledRDDPartitions by a single task + // like in case of CoalescedRDD occurred after the ShuffledRDD in the same stage + // (e.g. rdd.repartition(1000).coalesce(10)); + // note that holding sorter references till the end of the task also holds + // references to PartitionedAppendOnlyMap and PartitionedPairBuffer too and these + // ones may consume a significant part of the available memory + context.remoteTaskCompletionListener(taskListener) --- End diff -- Nice catch. Liked I said in the above, do we have another way to remove reference to sorter?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org