[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761607#comment-16761607 ]
Tao Luo commented on SPARK-24657: --------------------------------- Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) sqlContext.clearCache() r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L], [timestamp2#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Running on Spark 2.4. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.2.0, 2.3.0, 2.3.1 > Reporter: Joshuawangzj > Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > {code} > Finally I found out the problem go through studying the source code. The > reason of the exception is that task can't allocate page(in my case, the > size per page is 32M) from MemoryManager because coalesce will run 20 parent > paritition in one task(spark.sql.shuffle.partitions=20), and after sorted > merge join for each parent partition, the UnsafeExternalRowSorter can not > cleanup some pages allocated. After run 14th parent partition(in my case), > there is no enough space in execution memory for acquiring page in sort. > Why UnsafeExternalRowSorter can not cleanup some pages resource after > finished join for parent partition? > After my constant attempts, the problem is in SortMergeJoinScanner. > UnsafeExternalRowSorter cleanup resource only when it's iterator be advance > to end. But in SortMergeJoinScanner, when streamedIterator is end ,the > bufferedIterator may not end, so bufferedIterator cannot cleanup the resource > and vice versa. > The solution may be : > 1、advance to last for the iterator when another iterator has traversed to > last. This solution may decrease performace because of the unnecessary > traversing. > 2、When one iterator has traversed to last, we invoke the sorter cleanup > method directly. This solution will cause large change for source code. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org