[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829545#comment-16829545 ] Tao Luo commented on SPARK-21492: - The problem is that the task won't complete because of memory being leaked (You can see from the simple example above) Secondly, it's not just the last page, it's every page with records from unused iterators. Can we increase the priority of this bug? SMJ is a pretty integral part of Spark SQL, and it seems like no progress is being made on this bug, which is causing jobs to fail and has no workaround. I don't think that it's a hack: the argument seems to be that limit also needs to fixed, so let's not fix this bug until that is also fixed, meanwhile this issue has been lingering since at least July 2017. This would fix a memory leak and improve performance from not spilling unnecessarily. Why don't we target this fix for SMJ first, since it's pretty isolated to UnsafeExternalRowIterator in SMJ, run it through all the test cases, and make additional changes as necessary in the future. I've been porting [this PR|https://github.com/apache/spark/pull/23762] onto my production Spark cluster for the last 3 months, but I'm hoping we can get some sort of fix into 3.0 at least. I started a discussion thread here, hopefully people can jump in: http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-leak-in-SortMergeJoin-td27152.html > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766598#comment-16766598 ] Tao Luo commented on SPARK-21492: - cc [~tejasp], [~kiszk] for input on code generation to address the memory leak. [https://github.com/apache/spark/pull/23762] > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765378#comment-16765378 ] Tao Luo edited comment on SPARK-21492 at 2/12/19 9:35 AM: -- I'll take a stab at this jira, should have something to review today or tomorrow. [https://github.com/apache/spark/pull/23762] (took some inspiration from Zhan's patch) I'd appreciate a review, and pointers on modifying code-gen portion. was (Author: taoluo): I'll take a stab at this jira, should have something to review today or tomorrow. > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765378#comment-16765378 ] Tao Luo commented on SPARK-21492: - I'll take a stab at this jira, should have something to review today or tomorrow. > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763825#comment-16763825 ] Tao Luo commented on SPARK-21492: - Can someone add 'affects version' 2.4.0 as well? > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763820#comment-16763820 ] Tao Luo commented on SPARK-21492: - If SortMergeJoinScanner doesn't consume the iterator from UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired from TaskMemoryManager will never be released. This leads to a memory leak, spills, and OOME. A page will be held per partition of the unused iterator. {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. Or if you prefer Scala: {code:java} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, rand} spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn("value", rand()) var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn("value2", rand()) var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} Just reproduced it in standalone mode using [https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,] Python 3.7. Same code as above. Succeeds with 1 thread: ./bin/pyspark --master local[1] Fails with >1 thread: ./bin/pyspark --master local[4] {code:java} SparkSession available as 'spark'. >>> from pyspark.sql.functions import rand, col >>> >>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") >>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) >>> >>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) >>> r1 = r1.withColumn('value', rand()) >>> r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2")) >>> r2 = r2.withColumn('value2', rand()) >>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") >>> joined = joined.coalesce(1) >>> joined.show(){code} {code:java} [Stage 2:> (0 + 1) / 1]2019-02-06 17:12:27 WARN TaskMemoryManager:304 - Failed to allocate a page (1900544 bytes), try again. 2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 6) org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code} > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In
[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763243#comment-16763243 ] Tao Luo commented on SPARK-24657: - If SortMergeJoinScanner doesn't consume UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired from TaskMemoryManager will never be released. This leads to a memory leak, spills, and OOME. A page will be held per partition of the unused iterator. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > {code} > Finally I found out the problem go through studying the source code. The > reason of the exception is that task can't allocate page(in my case, the > size per page is 32M) from MemoryManager because coalesce will run 20 parent > paritition in one task(spark.sql.shuffle.partitions=20), and after sorted > merge join for each parent partition, the UnsafeExternalRowSorter can not > cleanup some pages allocated. After run 14th parent partition(in my case), > there is no enough space in execution memory for acquiring page in sort. > Why UnsafeExternalRowSorter can not cleanup some pages resource after > finished join for parent partition? > After my constant attempts, the problem is in SortMergeJoinScanner. > UnsafeExternalRowSorter cleanup resource only when it's iterator be advance > to end. But in SortMergeJoinScanner, when streamedIterator is end ,the > bufferedIterator may not end, so bufferedIterator cannot cleanup the resource > and vice versa. > The solution may be : > 1、advance to last for
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16762280#comment-16762280 ] Tao Luo edited comment on SPARK-24657 at 2/7/19 1:53 AM: - Just reproduced it in standalone mode using [https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,] Python 3.7. Same code as above. Succeeds with 1 thread: ./bin/pyspark --master local[1] Fails with >1 thread: ./bin/pyspark --master local[4] {code:java} SparkSession available as 'spark'. >>> from pyspark.sql.functions import rand, col >>> >>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") >>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) >>> >>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) >>> r1 = r1.withColumn('value', rand()) >>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) >>> r2 = r2.withColumn('value2', rand()) >>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") >>> joined = joined.coalesce(1) >>> joined.show(){code} {code:java} [Stage 2:> (0 + 1) / 1]2019-02-06 17:12:27 WARN TaskMemoryManager:304 - Failed to allocate a page (1900544 bytes), try again. 2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 6) org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code} was (Author: taoluo): Just reproduced it in standalone mode using [https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,] Python 3.7. Same code as above. Succeeds with 1 thread: ./bin/pyspark Fails with >1 thread: ./bin/pyspark --master local[2] {code:java} SparkSession available as 'spark'. >>> from pyspark.sql.functions import rand, col >>> >>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") >>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) >>> >>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) >>> r1 = r1.withColumn('value', rand()) >>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) >>> r2 = r2.withColumn('value2', rand()) >>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") >>> joined = joined.coalesce(1) >>> joined.show(){code} {code:java} [Stage 2:> (0 + 1) / 1]2019-02-06 17:12:27 WARN TaskMemoryManager:304 - Failed to allocate a page (1900544 bytes), try again. 2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 6) org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code} > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at >
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607 ] Tao Luo edited comment on SPARK-24657 at 2/7/19 2:02 AM: - Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. Or if you prefer Scala: {code:java} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, rand} val spark = new SparkSession() spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn("value", rand()) var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn("value2", rand()) var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner") joined = joined.coalesce(1) joined.explain() joined.show() {code} was (Author: taoluo): Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607 ] Tao Luo edited comment on SPARK-24657 at 2/7/19 2:03 AM: - Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. Or if you prefer Scala: {code:java} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, rand} spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn("value", rand()) var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn("value2", rand()) var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner") joined = joined.coalesce(1) joined.explain() joined.show() {code} was (Author: taoluo): Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. Or if you prefer Scala: {code:java} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, rand} val spark = new SparkSession() spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn("value", rand()) var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn("value2", rand()) var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner") joined = joined.coalesce(1) joined.explain() joined.show() {code} >
[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16762280#comment-16762280 ] Tao Luo commented on SPARK-24657: - Just reproduced it in standalone mode using [https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,] Python 3.7. Same code as above. Succeeds with 1 thread: ./bin/pyspark Fails with >1 thread: ./bin/pyspark --master local[2] {code:java} SparkSession available as 'spark'. >>> from pyspark.sql.functions import rand, col >>> >>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") >>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) >>> >>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) >>> r1 = r1.withColumn('value', rand()) >>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) >>> r2 = r2.withColumn('value2', rand()) >>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") >>> joined = joined.coalesce(1) >>> joined.show(){code} {code:java} [Stage 2:> (0 + 1) / 1]2019-02-06 17:12:27 WARN TaskMemoryManager:304 - Failed to allocate a page (1900544 bytes), try again. 2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 6) org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code} > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at >
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607 ] Tao Luo edited comment on SPARK-24657 at 2/6/19 9:31 AM: - Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. was (Author: taoluo): Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at >
[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607 ] Tao Luo commented on SPARK-24657: - Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) sqlContext.clearCache() r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L], [timestamp2#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Running on Spark 2.4. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607 ] Tao Luo edited comment on SPARK-24657 at 2/6/19 9:22 AM: - Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. was (Author: taoluo): Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Running on Spark 2.4. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at >
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607 ] Tao Luo edited comment on SPARK-24657 at 2/6/19 9:21 AM: - Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Running on Spark 2.4. was (Author: taoluo): Sure: {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) sqlContext.clearCache() r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L], [timestamp2#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Running on Spark 2.4. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at >
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760328#comment-16760328 ] Tao Luo edited comment on SPARK-24657 at 2/5/19 12:25 AM: -- Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-21492 to me. I can't change the affects version, but I'm hitting this in 2.4.0 was (Author: taoluo): Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to me. I can't change the affects version, but I'm hitting this in 2.4.0 > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > {code} > Finally I found out the problem go through studying the source code. The > reason of the exception is that task can't allocate page(in my case, the > size per page is 32M) from MemoryManager because coalesce will run 20 parent > paritition in one task(spark.sql.shuffle.partitions=20), and after sorted > merge join for each parent partition, the UnsafeExternalRowSorter can not > cleanup some pages allocated. After run 14th parent partition(in my case), > there is no enough space in execution memory for acquiring page in sort. > Why UnsafeExternalRowSorter can not cleanup some pages resource after > finished join for parent partition? > After my constant attempts, the problem is in SortMergeJoinScanner. > UnsafeExternalRowSorter cleanup resource only when it's iterator be advance > to end. But in SortMergeJoinScanner, when streamedIterator is end ,the > bufferedIterator may not end, so
[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760328#comment-16760328 ] Tao Luo edited comment on SPARK-24657 at 2/5/19 12:05 AM: -- Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to me. I can't change the affects version, but I'm hitting this in 2.4.0 was (Author: taoluo): Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to me. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > {code} > Finally I found out the problem go through studying the source code. The > reason of the exception is that task can't allocate page(in my case, the > size per page is 32M) from MemoryManager because coalesce will run 20 parent > paritition in one task(spark.sql.shuffle.partitions=20), and after sorted > merge join for each parent partition, the UnsafeExternalRowSorter can not > cleanup some pages allocated. After run 14th parent partition(in my case), > there is no enough space in execution memory for acquiring page in sort. > Why UnsafeExternalRowSorter can not cleanup some pages resource after > finished join for parent partition? > After my constant attempts, the problem is in SortMergeJoinScanner. > UnsafeExternalRowSorter cleanup resource only when it's iterator be advance > to end. But in SortMergeJoinScanner, when streamedIterator is end ,the > bufferedIterator may not end, so bufferedIterator cannot cleanup the resource > and vice versa. > The solution
[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join
[ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760328#comment-16760328 ] Tao Luo commented on SPARK-24657: - Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to me. > SortMergeJoin may cause SparkOutOfMemory in execution memory because of not > cleanup resource when finished the merge join > --- > > Key: SPARK-24657 > URL: https://issues.apache.org/jira/browse/SPARK-24657 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1 >Reporter: Joshuawangzj >Priority: Major > > In my sql, It join three tables, and all these tables are small table (about > 2mb). And to solve the small files issue, I use coalesce(1). But it throw the > oom exception: > {code:java} > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847) > {code} > {code:java} > 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in > stage 3.0 (TID 34, localhost, executor driver): > org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes > of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96) > at > org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22) > {code} > Finally I found out the problem go through studying the source code. The > reason of the exception is that task can't allocate page(in my case, the > size per page is 32M) from MemoryManager because coalesce will run 20 parent > paritition in one task(spark.sql.shuffle.partitions=20), and after sorted > merge join for each parent partition, the UnsafeExternalRowSorter can not > cleanup some pages allocated. After run 14th parent partition(in my case), > there is no enough space in execution memory for acquiring page in sort. > Why UnsafeExternalRowSorter can not cleanup some pages resource after > finished join for parent partition? > After my constant attempts, the problem is in SortMergeJoinScanner. > UnsafeExternalRowSorter cleanup resource only when it's iterator be advance > to end. But in SortMergeJoinScanner, when streamedIterator is end ,the > bufferedIterator may not end, so bufferedIterator cannot cleanup the resource > and vice versa. > The solution may be : > 1、advance to last for the iterator when another iterator has traversed to > last. This solution may decrease performace because of the unnecessary > traversing. > 2、When one iterator has traversed to last, we