[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-04-29 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829545#comment-16829545
 ] 

Tao Luo commented on SPARK-21492:
-

The problem is that the task won't complete because of memory being leaked (You 
can see from the simple example above)
Secondly, it's not just the last page, it's every page with records from unused 
iterators. 
Can we increase the priority of this bug? SMJ is a pretty integral part of 
Spark SQL, and it seems like no progress is being made on this bug, which is 
causing jobs to fail and has no workaround. 

I don't think that it's a hack: the argument seems to be that limit also needs 
to fixed, so let's not fix this bug until that is also fixed, meanwhile this 
issue has been lingering since at least July 2017. 
This would fix a memory leak and improve performance from not spilling 
unnecessarily. Why don't we target this fix for SMJ first, since it's pretty 
isolated to UnsafeExternalRowIterator in SMJ, run it through all the test 
cases, and make additional changes as necessary in the future. 

I've been porting [this PR|https://github.com/apache/spark/pull/23762] onto my 
production Spark cluster for the last 3 months, but I'm hoping we can get some 
sort of fix into 3.0 at least.

I started a discussion thread here, hopefully people can jump in:
http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-leak-in-SortMergeJoin-td27152.html


> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-12 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766598#comment-16766598
 ] 

Tao Luo commented on SPARK-21492:
-

cc [~tejasp], [~kiszk] for input on code generation to address the memory leak. 

[https://github.com/apache/spark/pull/23762] 

 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-12 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765378#comment-16765378
 ] 

Tao Luo edited comment on SPARK-21492 at 2/12/19 9:35 AM:
--

I'll take a stab at this jira, should have something to review today or 
tomorrow. 

[https://github.com/apache/spark/pull/23762] (took some inspiration from Zhan's 
patch)

I'd appreciate a review, and pointers on modifying code-gen portion.


was (Author: taoluo):
I'll take a stab at this jira, should have something to review today or 
tomorrow. 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-11 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765378#comment-16765378
 ] 

Tao Luo commented on SPARK-21492:
-

I'll take a stab at this jira, should have something to review today or 
tomorrow. 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-08 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763825#comment-16763825
 ] 

Tao Luo commented on SPARK-21492:
-

Can someone add 'affects version' 2.4.0 as well? 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-08 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763820#comment-16763820
 ] 

Tao Luo commented on SPARK-21492:
-

If SortMergeJoinScanner doesn't consume the iterator from 
UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired 
from TaskMemoryManager will never be released. This leads to a memory leak, 
spills, and OOME. A page will be held per partition of the unused iterator.



{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
{{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
 

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 

 

Just reproduced it in standalone mode using 
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
 Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark --master local[1]

Fails with >1 thread: ./bin/pyspark --master local[4]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 
1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page 
(1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 
6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of 
memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In 

[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-07 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763243#comment-16763243
 ] 

Tao Luo commented on SPARK-24657:
-

If SortMergeJoinScanner doesn't consume UnsafeExternalRowSorter entirely, the 
memory that UnsafeExternalSorter acquired from TaskMemoryManager will never be 
released. This leads to a memory leak, spills, and OOME. A page will be held 
per partition of the unused iterator.

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The 
> reason of  the exception is that task can't allocate page(in my case, the 
> size per page is 32M) from MemoryManager because coalesce will run 20 parent 
> paritition in one task(spark.sql.shuffle.partitions=20), and after sorted 
> merge join for each parent partition, the UnsafeExternalRowSorter can not 
> cleanup some pages allocated. After run 14th parent partition(in my case), 
> there is no enough space in execution memory for acquiring page in sort. 
> Why UnsafeExternalRowSorter can not cleanup some pages resource after 
> finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner. 
> UnsafeExternalRowSorter cleanup resource only when it's iterator be advance 
> to end. But in SortMergeJoinScanner, when streamedIterator is end ,the 
> bufferedIterator may not end, so bufferedIterator cannot cleanup the resource 
> and vice versa.
> The solution may be :
> 1、advance to last for 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16762280#comment-16762280
 ] 

Tao Luo edited comment on SPARK-24657 at 2/7/19 1:53 AM:
-

Just reproduced it in standalone mode using 
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
 Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark --master local[1]

Fails with >1 thread: ./bin/pyspark --master local[4]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 
1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page 
(1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 
6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of 
memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 


was (Author: taoluo):
Just reproduced it in standalone mode using 
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
 Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark

Fails with >1 thread: ./bin/pyspark --master local[2]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 
1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page 
(1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 
6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of 
memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607
 ] 

Tao Luo edited comment on SPARK-24657 at 2/7/19 2:02 AM:
-

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

val spark = new SparkSession()
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
{code}


was (Author: taoluo):
Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}


 {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

 

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607
 ] 

Tao Luo edited comment on SPARK-24657 at 2/7/19 2:03 AM:
-

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
{code}


was (Author: taoluo):
Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

val spark = new SparkSession()
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
{code}

> 

[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16762280#comment-16762280
 ] 

Tao Luo commented on SPARK-24657:
-

Just reproduced it in standalone mode using 
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
 Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark

Fails with >1 thread: ./bin/pyspark --master local[2]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 
1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page 
(1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 
6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of 
memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607
 ] 

Tao Luo edited comment on SPARK-24657 at 2/6/19 9:31 AM:
-

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}


 {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

 


was (Author: taoluo):
Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}
 {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}

 Running on Spark 2.4. 

 


  

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> 

[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607
 ] 

Tao Luo commented on SPARK-24657:
-

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

sqlContext.clearCache()
r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
{{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L], 
[timestamp2#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST], false, 0 
: +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L 
AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 
1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST], false, 0 
+- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 
2001, step=1, splits=4)}}
{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
 
 
Running on Spark 2.4. 
 

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607
 ] 

Tao Luo edited comment on SPARK-24657 at 2/6/19 9:22 AM:
-

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}
 {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}

 Running on Spark 2.4. 

 


  


was (Author: taoluo):
Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}
 {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  
  
 Running on Spark 2.4. 
  

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-06 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761607#comment-16761607
 ] 

Tao Luo edited comment on SPARK-24657 at 2/6/19 9:21 AM:
-

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}
 {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  
  
 Running on Spark 2.4. 
  


was (Author: taoluo):
Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

sqlContext.clearCache()
r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
{{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L], 
[timestamp2#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST], false, 0 
: +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L 
AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 
1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST], false, 0 
+- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 
2001, step=1, splits=4)}}
{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
 
 
Running on Spark 2.4. 
 

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-04 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760328#comment-16760328
 ] 

Tao Luo edited comment on SPARK-24657 at 2/5/19 12:25 AM:
--

Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-21492 to 
me.

I can't change the affects version, but I'm hitting this in 2.4.0


was (Author: taoluo):
Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to 
me.

I can't change the affects version, but I'm hitting this in 2.4.0

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The 
> reason of  the exception is that task can't allocate page(in my case, the 
> size per page is 32M) from MemoryManager because coalesce will run 20 parent 
> paritition in one task(spark.sql.shuffle.partitions=20), and after sorted 
> merge join for each parent partition, the UnsafeExternalRowSorter can not 
> cleanup some pages allocated. After run 14th parent partition(in my case), 
> there is no enough space in execution memory for acquiring page in sort. 
> Why UnsafeExternalRowSorter can not cleanup some pages resource after 
> finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner. 
> UnsafeExternalRowSorter cleanup resource only when it's iterator be advance 
> to end. But in SortMergeJoinScanner, when streamedIterator is end ,the 
> bufferedIterator may not end, so 

[jira] [Comment Edited] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-04 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760328#comment-16760328
 ] 

Tao Luo edited comment on SPARK-24657 at 2/5/19 12:05 AM:
--

Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to 
me.

I can't change the affects version, but I'm hitting this in 2.4.0


was (Author: taoluo):
Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to 
me.

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The 
> reason of  the exception is that task can't allocate page(in my case, the 
> size per page is 32M) from MemoryManager because coalesce will run 20 parent 
> paritition in one task(spark.sql.shuffle.partitions=20), and after sorted 
> merge join for each parent partition, the UnsafeExternalRowSorter can not 
> cleanup some pages allocated. After run 14th parent partition(in my case), 
> there is no enough space in execution memory for acquiring page in sort. 
> Why UnsafeExternalRowSorter can not cleanup some pages resource after 
> finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner. 
> UnsafeExternalRowSorter cleanup resource only when it's iterator be advance 
> to end. But in SortMergeJoinScanner, when streamedIterator is end ,the 
> bufferedIterator may not end, so bufferedIterator cannot cleanup the resource 
> and vice versa.
> The solution 

[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2019-02-04 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760328#comment-16760328
 ] 

Tao Luo commented on SPARK-24657:
-

Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to 
me.

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1
>Reporter: Joshuawangzj
>Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>   at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The 
> reason of  the exception is that task can't allocate page(in my case, the 
> size per page is 32M) from MemoryManager because coalesce will run 20 parent 
> paritition in one task(spark.sql.shuffle.partitions=20), and after sorted 
> merge join for each parent partition, the UnsafeExternalRowSorter can not 
> cleanup some pages allocated. After run 14th parent partition(in my case), 
> there is no enough space in execution memory for acquiring page in sort. 
> Why UnsafeExternalRowSorter can not cleanup some pages resource after 
> finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner. 
> UnsafeExternalRowSorter cleanup resource only when it's iterator be advance 
> to end. But in SortMergeJoinScanner, when streamedIterator is end ,the 
> bufferedIterator may not end, so bufferedIterator cannot cleanup the resource 
> and vice versa.
> The solution may be :
> 1、advance to last for the iterator when another iterator has traversed to 
> last. This solution may decrease performace because of the unnecessary 
> traversing.
> 2、When one iterator has traversed to last, we