[ 
https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410703#comment-15410703
 ] 

Sital Kedia commented on SPARK-16922:
-------------------------------------

Update - The query works fine when Broadcast hash join in turned off, so the 
issue might be in broadcast hash join. I put some debug print in 
UnsafeRowWriter class 
(https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L214)
 and I found that it is receiving a row of size around 800MB and OOMing while 
trying to grow the buffer holder. It might suggest that there is some data 
corruption going on probably in the Broadcast hash join. 

cc- [~davies] - Any pointer on how to debug this issue further? 

> Query failure due to executor OOM in Spark 2.0
> ----------------------------------------------
>
>                 Key: SPARK-16922
>                 URL: https://issues.apache.org/jira/browse/SPARK-16922
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 2.0.0
>            Reporter: Sital Kedia
>
> A query which used to work in Spark 1.6 fails with executor OOM in 2.0.
> Stack trace - 
> {code}
>       at 
> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Query plan in Spark 1.6
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3])
> +- TungstenExchange hashpartitioning(field1#101,200), None
>    +- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / 
> 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111])
>       +- Project [field1#101,field2#74]
>          +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as 
> decimal(20,0)) as bigint)], BuildRight
>             :- ConvertToUnsafe
>             :  +- HiveTableScan [field2#74,field5#63L], MetastoreRelation 
> foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)]
>             +- ConvertToUnsafe
>                +- HiveTableScan [field1#101,field4#97], MetastoreRelation 
> foo, table2, Some(b)
> {code}
> Query plan in 2.0
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))])
> +- Exchange hashpartitioning(field1#160, 200)
>    +- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / 
> 100.0))])
>       +- *Project [field2#133, field1#160]
>          +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as 
> decimal(20,0)) as bigint)], Inner, BuildRight
>             :- *Filter isnotnull(field5#122L)
>             :  +- HiveTableScan [field5#122L, field2#133], MetastoreRelation 
> foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= 
> 2013-12-31)]
>             +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as 
> decimal(20,0)) as bigint)))
>                +- *Filter isnotnull(field4#156)
>                   +- HiveTableScan [field4#156, field1#160], 
> MetastoreRelation foo, table2, b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to