[ https://issues.apache.org/jira/browse/SPARK-16922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410703#comment-15410703 ]
Sital Kedia commented on SPARK-16922: ------------------------------------- Update - The query works fine when Broadcast hash join in turned off, so the issue might be in broadcast hash join. I put some debug print in UnsafeRowWriter class (https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L214) and I found that it is receiving a row of size around 800MB and OOMing while trying to grow the buffer holder. It might suggest that there is some data corruption going on probably in the Broadcast hash join. cc- [~davies] - Any pointer on how to debug this issue further? > Query failure due to executor OOM in Spark 2.0 > ---------------------------------------------- > > Key: SPARK-16922 > URL: https://issues.apache.org/jira/browse/SPARK-16922 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 2.0.0 > Reporter: Sital Kedia > > A query which used to work in Spark 1.6 fails with executor OOM in 2.0. > Stack trace - > {code} > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.hash$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$agg_VectorizedHashMap.findOrInsert(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Query plan in Spark 1.6 > {code} > == Physical Plan == > TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Final,isDistinct=false)], output=[field1#101,field3#3]) > +- TungstenExchange hashpartitioning(field1#101,200), None > +- TungstenAggregate(key=[field1#101], functions=[(sum((field2#74 / > 100.0)),mode=Partial,isDistinct=false)], output=[field1#101,sum#111]) > +- Project [field1#101,field2#74] > +- BroadcastHashJoin [field5#63L], [cast(cast(field4#97 as > decimal(20,0)) as bigint)], BuildRight > :- ConvertToUnsafe > : +- HiveTableScan [field2#74,field5#63L], MetastoreRelation > foo, table1, Some(a), [(ds#57 >= 2013-10-01),(ds#57 <= 2013-12-31)] > +- ConvertToUnsafe > +- HiveTableScan [field1#101,field4#97], MetastoreRelation > foo, table2, Some(b) > {code} > Query plan in 2.0 > {code} > == Physical Plan == > *HashAggregate(keys=[field1#160], functions=[sum((field2#133 / 100.0))]) > +- Exchange hashpartitioning(field1#160, 200) > +- *HashAggregate(keys=[field1#160], functions=[partial_sum((field2#133 / > 100.0))]) > +- *Project [field2#133, field1#160] > +- *BroadcastHashJoin [field5#122L], [cast(cast(field4#156 as > decimal(20,0)) as bigint)], Inner, BuildRight > :- *Filter isnotnull(field5#122L) > : +- HiveTableScan [field5#122L, field2#133], MetastoreRelation > foo, table1, a, [isnotnull(ds#116), (ds#116 >= 2013-10-01), (ds#116 <= > 2013-12-31)] > +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as > decimal(20,0)) as bigint))) > +- *Filter isnotnull(field4#156) > +- HiveTableScan [field4#156, field1#160], > MetastoreRelation foo, table2, b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org