[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083311#comment-15083311 ]
Daniel Darabos commented on SPARK-11293: ---------------------------------------- I have a somewhat contrived example that still leaks in 1.6.0. I started {{spark-shell --master 'local-cluster[2,2,1024]'}} and ran: {code} sc.parallelize(0 to 10000000, 2).map(x => x % 10000 -> x).groupByKey.asInstanceOf[org.apache.spark.rdd.ShuffledRDD[Int, Int, Iterable[Int]]].setKeyOrdering(implicitly[Ordering[Int]]).mapPartitions { it => it.take(1) }.collect {code} I've added extra logging around task memory acquisition so I would be able to see what is not released. These are the logs: {code} 16/01/05 17:02:45 INFO Executor: Running task 0.0 in stage 13.0 (TID 24) 16/01/05 17:02:45 INFO MapOutputTrackerWorker: Updating epoch to 7 and clearing cache 16/01/05 17:02:45 INFO TorrentBroadcast: Started reading broadcast variable 13 16/01/05 17:02:45 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 2.3 KB, free 7.6 KB) 16/01/05 17:02:45 INFO TorrentBroadcast: Reading broadcast variable 13 took 6 ms 16/01/05 17:02:45 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 4.5 KB, free 12.1 KB) 16/01/05 17:02:45 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 6, fetching them 16/01/05 17:02:45 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@192.168.0.32:55147) 16/01/05 17:02:45 INFO MapOutputTrackerWorker: Got the output locations 16/01/05 17:02:45 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 16/01/05 17:02:45 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1 ms 16/01/05 17:02:45 ERROR TaskMemoryManager: Task 24 acquire 5.0 MB for null 16/01/05 17:02:45 ERROR TaskMemoryManager: Stack trace: java.lang.Exception: here at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187) at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82) at org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/05 17:02:47 ERROR TaskMemoryManager: Task 24 acquire 15.0 MB for null 16/01/05 17:02:47 ERROR TaskMemoryManager: Stack trace: java.lang.Exception: here at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187) at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82) at org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 acquire 5.0 MB for null 16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace: java.lang.Exception: here at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187) at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82) at org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:89) at org.apache.spark.util.collection.ExternalSorter.maybeSpillCollection(ExternalSorter.scala:220) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 acquire 10.5 MB for null 16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace: java.lang.Exception: here at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187) at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82) at org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:89) at org.apache.spark.util.collection.ExternalSorter.maybeSpillCollection(ExternalSorter.scala:220) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 release 20.0 MB from null 16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace: java.lang.Exception: here at org.apache.spark.memory.TaskMemoryManager.releaseExecutionMemory(TaskMemoryManager.java:197) at org.apache.spark.util.collection.Spillable$class.releaseMemory(Spillable.scala:111) at org.apache.spark.util.collection.ExternalAppendOnlyMap.releaseMemory(ExternalAppendOnlyMap.scala:55) at org.apache.spark.util.collection.ExternalAppendOnlyMap.org$apache$spark$util$collection$ExternalAppendOnlyMap$$freeCurrentMap(ExternalAppendOnlyMap.scala:259) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$iterator$1.apply$mcV$sp(ExternalAppendOnlyMap.scala:251) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/01/05 17:02:49 ERROR Executor: Managed memory leak detected; size = 16259594 bytes, TID = 24 {code} The issue is that {{ExternalSorter.stop()}} is only called by {{CompletionIterator}} if the iterator is iterated through to the end. But here we only take the first element. In practice this happens to us in a {{zipPartitions}} call where we do not iterate both iterators to the end. (It's a kind of join.) Is it illegal to not iterate an RDD iterator to the end? I think it's not. {{RDD.take}} stops short as well. This issue can probably be reproduced with {{RDD.take}} too. (I tried and failed.) > Spillable collections leak shuffle memory > ----------------------------------------- > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.3.1, 1.4.1, 1.5.1 > Reporter: Josh Rosen > Assignee: Josh Rosen > Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org