[ 
https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083311#comment-15083311
 ] 

Daniel Darabos commented on SPARK-11293:
----------------------------------------

I have a somewhat contrived example that still leaks in 1.6.0. I started 
{{spark-shell --master 'local-cluster[2,2,1024]'}} and ran:

{code}
sc.parallelize(0 to 10000000, 2).map(x => x % 10000 -> 
x).groupByKey.asInstanceOf[org.apache.spark.rdd.ShuffledRDD[Int, Int, 
Iterable[Int]]].setKeyOrdering(implicitly[Ordering[Int]]).mapPartitions { it => 
it.take(1) }.collect
{code}

I've added extra logging around task memory acquisition so I would be able to 
see what is not released. These are the logs:

{code}
16/01/05 17:02:45 INFO Executor: Running task 0.0 in stage 13.0 (TID 24)
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Updating epoch to 7 and clearing 
cache
16/01/05 17:02:45 INFO TorrentBroadcast: Started reading broadcast variable 13
16/01/05 17:02:45 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes 
in memory (estimated size 2.3 KB, free 7.6 KB)
16/01/05 17:02:45 INFO TorrentBroadcast: Reading broadcast variable 13 took 6 ms
16/01/05 17:02:45 INFO MemoryStore: Block broadcast_13 stored as values in 
memory (estimated size 4.5 KB, free 12.1 KB)
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 6, fetching them
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@192.168.0.32:55147)
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Got the output locations
16/01/05 17:02:45 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks 
out of 2 blocks
16/01/05 17:02:45 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 
1 ms
16/01/05 17:02:45 ERROR TaskMemoryManager: Task 24 acquire 5.0 MB for null
16/01/05 17:02:45 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
        at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
        at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:47 ERROR TaskMemoryManager: Task 24 acquire 15.0 MB for null
16/01/05 17:02:47 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
        at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
        at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 acquire 5.0 MB for null
16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
        at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
        at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
        at 
org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:89)
        at 
org.apache.spark.util.collection.ExternalSorter.maybeSpillCollection(ExternalSorter.scala:220)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 acquire 10.5 MB for null
16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
        at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
        at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
        at 
org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:89)
        at 
org.apache.spark.util.collection.ExternalSorter.maybeSpillCollection(ExternalSorter.scala:220)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 release 20.0 MB from null
16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
        at 
org.apache.spark.memory.TaskMemoryManager.releaseExecutionMemory(TaskMemoryManager.java:197)
        at 
org.apache.spark.util.collection.Spillable$class.releaseMemory(Spillable.scala:111)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.releaseMemory(ExternalAppendOnlyMap.scala:55)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.org$apache$spark$util$collection$ExternalAppendOnlyMap$$freeCurrentMap(ExternalAppendOnlyMap.scala:259)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$iterator$1.apply$mcV$sp(ExternalAppendOnlyMap.scala:251)
        at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR Executor: Managed memory leak detected; size = 16259594 
bytes, TID = 24
{code}

The issue is that {{ExternalSorter.stop()}} is only called by 
{{CompletionIterator}} if the iterator is iterated through to the end. But here 
we only take the first element.

In practice this happens to us in a {{zipPartitions}} call where we do not 
iterate both iterators to the end. (It's a kind of join.)

Is it illegal to not iterate an RDD iterator to the end? I think it's not. 
{{RDD.take}} stops short as well. This issue can probably be reproduced with 
{{RDD.take}} too. (I tried and failed.)

> Spillable collections leak shuffle memory
> -----------------------------------------
>
>                 Key: SPARK-11293
>                 URL: https://issues.apache.org/jira/browse/SPARK-11293
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1, 1.4.1, 1.5.1
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>             Fix For: 1.6.0
>
>
> I discovered multiple leaks of shuffle memory while working on my memory 
> manager consolidation patch, which added the ability to do strict memory leak 
> detection for the bookkeeping that used to be performed by the 
> ShuffleMemoryManager. This uncovered a handful of places where tasks can 
> acquire execution/shuffle memory but never release it, starving themselves of 
> memory.
> Problems that I found:
> * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution 
> memory.
> * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a 
> {{CompletionIterator}}.
> * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing 
> its resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to