[ https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu updated SPARK-15390: ------------------------------- Assignee: Davies Liu > Memory management issue in complex DataFrame join and filter > ------------------------------------------------------------ > > Key: SPARK-15390 > URL: https://issues.apache.org/jira/browse/SPARK-15390 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Environment: branch-2.0, 16 workers > Reporter: Joseph K. Bradley > Assignee: Davies Liu > Fix For: 2.0.0 > > > See [SPARK-15389] for a description of the code which produces this bug. I > am filing this as a separate JIRA since the bug in 2.0 is different. > In 2.0, the code fails with some memory management error. Here is the > stacktrace: > {code} > OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support > was removed in 8.0 > 16/05/18 19:23:16 ERROR Uncaught throwable from user code: > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange SinglePartition, None > +- WholeStageCodegen > : +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L]) > : +- Project > : +- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None > : :- INPUT > : +- Project [id#110L] > : +- Filter (degree#115 > 2000000) > : +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[id#110L,degree#115]) > : +- INPUT > :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint])) > : +- WholeStageCodegen > : : +- Project [row#66.id AS id#70L] > : : +- Filter isnotnull(row#66.id) > : : +- INPUT > : +- Scan ExistingRDD[row#66,uniq_id#67] > +- Exchange hashpartitioning(id#110L, 200), None > +- WholeStageCodegen > : +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[id#110L,count#136L]) > : +- Filter isnotnull(id#110L) > : +- INPUT > +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L] > +- WholeStageCodegen > : +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT > (src#2L = dst#3L)) > : +- INPUT > +- InMemoryTableScan [src#2L,dst#3L], > [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation > [src#2L,dst#3L], true, 10000, StorageLevel(disk=true, memory=true, > offheap=false, deserialized=true, replication=1), WholeStageCodegen, None > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2128) > at > org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2156) > at > org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2155) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2155) > at Notebook.summary$1(<console>:70) > at Notebook.getIndexedEdges(<console>:82) > at Notebook.getIndexedGraph(<console>:135) > Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102) > at > org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30) > at > org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79) > at > org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.consume(TungstenAggregate.scala:33) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.generateResultCode(TungstenAggregate.scala:432) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduceWithKeys(TungstenAggregate.scala:534) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduce(TungstenAggregate.scala:141) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.produce(TungstenAggregate.scala:33) > at > org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduceWithoutKeys(TungstenAggregate.scala:211) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduce(TungstenAggregate.scala:139) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.produce(TungstenAggregate.scala:33) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:122) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2128) > at > org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2156) > at > org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2155) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2155) > at Notebook.summary$1(<console>:70) > at Notebook.getIndexedEdges(<console>:82) > at Notebook.getIndexedGraph(<console>:135) > Caused by: java.util.concurrent.ExecutionException: Boxed Error > at scala.concurrent.impl.Promise$.resolver(Promise.scala:55) > at > scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244) > at scala.concurrent.Promise$class.complete(Promise.scala:55) > at > scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.AssertionError: assertion failed: invalid number of > bytes requested: -2146435072 > at scala.Predef$.assert(Predef.scala:179) > at > org.apache.spark.memory.ExecutionMemoryPool.acquireMemory(ExecutionMemoryPool.scala:96) > at > org.apache.spark.memory.StaticMemoryManager.acquireExecutionMemory(StaticMemoryManager.scala:98) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:145) > at > org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.acquireMemory(HashedRelation.scala:403) > at > org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.init(HashedRelation.scala:419) > at > org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.<init>(HashedRelation.scala:426) > at > org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:795) > at > org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:105) > at > org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:819) > at > org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:815) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:80) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:71) > at > org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:71) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:71) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org