[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763243#comment-16763243 ]
Tao Luo commented on SPARK-24657: --------------------------------- If SortMergeJoinScanner doesn't consume UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired from TaskMemoryManager will never be released. This leads to a memory leak, spills, and OOME. A page will be held per partition of the unused iterator. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.2.0, 2.3.0, 2.3.1 > Reporter: Joshuawangzj > Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > {code} > Finally I found out the problem go through studying the source code. The > reason of the exception is that task can't allocate page(in my case, the > size per page is 32M) from MemoryManager because coalesce will run 20 parent > paritition in one task(spark.sql.shuffle.partitions=20), and after sorted > merge join for each parent partition, the UnsafeExternalRowSorter can not > cleanup some pages allocated. After run 14th parent partition(in my case), > there is no enough space in execution memory for acquiring page in sort. > Why UnsafeExternalRowSorter can not cleanup some pages resource after > finished join for parent partition? > After my constant attempts, the problem is in SortMergeJoinScanner. > UnsafeExternalRowSorter cleanup resource only when it's iterator be advance > to end. But in SortMergeJoinScanner, when streamedIterator is end ,the > bufferedIterator may not end, so bufferedIterator cannot cleanup the resource > and vice versa. > The solution may be : > 1、advance to last for the iterator when another iterator has traversed to > last. This solution may decrease performace because of the unnecessary > traversing. > 2、When one iterator has traversed to last, we invoke the sorter cleanup > method directly. This solution will cause large change for source code. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org