[ https://issues.apache.org/jira/browse/SPARK-24729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531071#comment-16531071 ]
Takeshi Yamamuro commented on SPARK-24729: ------------------------------------------ Can you run on v2.3.1? > Spark - stackoverflow error - org.apache.spark.sql.catalyst.plans.QueryPlan > --------------------------------------------------------------------------- > > Key: SPARK-24729 > URL: https://issues.apache.org/jira/browse/SPARK-24729 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 2.1.1 > Reporter: t oo > Priority: Major > > Ran a spark (v2.1.1) job that joins 2 rdds (one is .txt file from S3, another > is parquet from S3) the job then merges the dataset (ie get latest row per > PK, if PK exists in txt and parquet then take the row from the .txt) and > writes out a new parquet to S3. Got this error but upon re-running it worked > fine. Both the .txt and parquet have 302 columns. The .txt has 191 rows, the > parquet has 156300 rows. Does anyone know the cause? > > {code:java} > > 18/07/02 13:51:56 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID > 134, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 6337 bytes) > 18/07/02 13:51:56 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory > on 10.160.122.226:38011 (size: 27.2 KB, free: 4.6 GB) > 18/07/02 13:51:56 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID > 134) in 295 ms on 10.160.122.226 (executor 0) (1/1) > 18/07/02 13:51:56 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks > have all completed, from pool > 18/07/02 13:51:56 INFO DAGScheduler: ResultStage 14 (load at Data.scala:25) > finished in 0.295 s > 18/07/02 13:51:56 INFO DAGScheduler: Job 7 finished: load at Data.scala:25, > took 0.310932 s > 18/07/02 13:51:57 INFO FileSourceStrategy: Pruning directories with: > 18/07/02 13:51:57 INFO FileSourceStrategy: Post-Scan Filters: > 18/07/02 13:51:57 INFO FileSourceStrategy: Output Data Schema: struct<row_id: > string, created: timestamp, created_by: string, last_upd: timestamp, > last_upd_by: string ... 300 more fields> > 18/07/02 13:51:57 INFO FileSourceStrategy: Pushed Filters: > 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_19 stored as values in > memory (estimated size 387.2 KB, free 911.2 MB) > 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes > in memory (estimated size 33.7 KB, free 911.1 MB) > 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory > on 10.160.123.242:38105 (size: 33.7 KB, free: 912.2 MB) > 18/07/02 13:51:57 INFO SparkContext: Created broadcast 19 from cache at > Upsert.scala:25 > 18/07/02 13:51:57 INFO FileSourceScanExec: Planning scan with bin packing, > max size: 48443541 bytes, open cost is considered as scanning 4194304 bytes. > 18/07/02 13:51:57 INFO SparkContext: Starting job: take at Utils.scala:28 > 18/07/02 13:51:57 INFO DAGScheduler: Got job 8 (take at Utils.scala:28) with > 1 output partitions > 18/07/02 13:51:57 INFO DAGScheduler: Final stage: ResultStage 15 (take at > Utils.scala:28) > 18/07/02 13:51:57 INFO DAGScheduler: Parents of final stage: List() > 18/07/02 13:51:57 INFO DAGScheduler: Missing parents: List() > 18/07/02 13:51:57 INFO DAGScheduler: Submitting ResultStage 15 > (MapPartitionsRDD[65] at take at Utils.scala:28), which has no missing parents > 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_20 stored as values in > memory (estimated size 321.5 KB, free 910.8 MB) > 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes > in memory (estimated size 93.0 KB, free 910.7 MB) > 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory > on 10.160.123.242:38105 (size: 93.0 KB, free: 912.1 MB) > 18/07/02 13:51:57 INFO SparkContext: Created broadcast 20 from broadcast at > DAGScheduler.scala:996 > 18/07/02 13:51:57 INFO DAGScheduler: Submitting 1 missing tasks from > ResultStage 15 (MapPartitionsRDD[65] at take at Utils.scala:28) > 18/07/02 13:51:57 INFO TaskSchedulerImpl: Adding task set 15.0 with 1 tasks > 18/07/02 13:51:57 INFO TaskSetManager: Starting task 0.0 in stage 15.0 (TID > 135, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 9035 bytes) > 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory > on 10.160.122.226:38011 (size: 93.0 KB, free: 4.6 GB) > 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory > on 10.160.122.226:38011 (size: 33.7 KB, free: 4.6 GB) > 18/07/02 13:52:05 INFO BlockManagerInfo: Added rdd_61_0 in memory on > 10.160.122.226:38011 (size: 38.9 MB, free: 4.5 GB) > 18/07/02 13:52:09 INFO BlockManagerInfo: Added rdd_63_0 in memory on > 10.160.122.226:38011 (size: 38.9 MB, free: 4.5 GB) > 18/07/02 13:52:09 INFO TaskSetManager: Finished task 0.0 in stage 15.0 (TID > 135) in 11751 ms on 10.160.122.226 (executor 0) (1/1) > 18/07/02 13:52:09 INFO TaskSchedulerImpl: Removed TaskSet 15.0, whose tasks > have all completed, from pool > 18/07/02 13:52:09 INFO DAGScheduler: ResultStage 15 (take at Utils.scala:28) > finished in 11.751 s > 18/07/02 13:52:09 INFO DAGScheduler: Job 8 finished: take at Utils.scala:28, > took 11.772561 s > 18/07/02 13:52:09 INFO CodeGenerator: Code generated in 185.277258 ms > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3459 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3452 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3456 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3455 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3458 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3450 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3460 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3449 > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_18_piece0 on > 10.160.123.242:38105 in memory (size: 27.2 KB, free: 912.1 MB) > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_18_piece0 on > 10.160.122.226:38011 in memory (size: 27.2 KB, free: 4.5 GB) > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3462 > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_17_piece0 on > 10.160.123.242:38105 in memory (size: 3.7 KB, free: 912.1 MB) > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_17_piece0 on > 10.160.122.226:38011 in memory (size: 3.7 KB, free: 4.5 GB) > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3451 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3684 > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_15_piece0 on > 10.160.123.242:38105 in memory (size: 25.6 KB, free: 912.1 MB) > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_15_piece0 on > 10.160.122.226:38011 in memory (size: 25.6 KB, free: 4.5 GB) > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3453 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3457 > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_16_piece0 on > 10.160.123.242:38105 in memory (size: 4.9 KB, free: 912.2 MB) > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_16_piece0 on > 10.160.122.226:38011 in memory (size: 4.9 KB, free: 4.5 GB) > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned shuffle 6 > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3461 > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_20_piece0 on > 10.160.123.242:38105 in memory (size: 93.0 KB, free: 912.2 MB) > 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_20_piece0 on > 10.160.122.226:38011 in memory (size: 93.0 KB, free: 4.5 GB) > 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3454 > 18/07/02 13:52:39 INFO SparkContext: Starting job: run at > ThreadPoolExecutor.java:1149 > 18/07/02 13:52:39 INFO DAGScheduler: Got job 9 (run at > ThreadPoolExecutor.java:1149) with 4 output partitions > 18/07/02 13:52:39 INFO DAGScheduler: Final stage: ResultStage 16 (run at > ThreadPoolExecutor.java:1149) > 18/07/02 13:52:39 INFO DAGScheduler: Parents of final stage: List() > 18/07/02 13:52:39 INFO DAGScheduler: Missing parents: List() > 18/07/02 13:52:39 INFO DAGScheduler: Submitting ResultStage 16 > (MapPartitionsRDD[67] at run at ThreadPoolExecutor.java:1149), which has no > missing parents > 18/07/02 13:52:39 INFO MemoryStore: Block broadcast_21 stored as values in > memory (estimated size 321.7 KB, free 911.3 MB) > 18/07/02 13:52:39 INFO MemoryStore: Block broadcast_21_piece0 stored as bytes > in memory (estimated size 93.0 KB, free 911.2 MB) > 18/07/02 13:52:39 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory > on 10.160.123.242:38105 (size: 93.0 KB, free: 912.2 MB) > 18/07/02 13:52:39 INFO SparkContext: Created broadcast 21 from broadcast at > DAGScheduler.scala:996 > 18/07/02 13:52:39 INFO DAGScheduler: Submitting 4 missing tasks from > ResultStage 16 (MapPartitionsRDD[67] at run at ThreadPoolExecutor.java:1149) > 18/07/02 13:52:39 INFO TaskSchedulerImpl: Adding task set 16.0 with 4 tasks > 18/07/02 13:52:39 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID > 136, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 9098 bytes) > 18/07/02 13:52:39 INFO TaskSetManager: Starting task 1.0 in stage 16.0 (TID > 137, 10.160.122.226, executor 0, partition 1, PROCESS_LOCAL, 9098 bytes) > 18/07/02 13:52:39 INFO TaskSetManager: Starting task 2.0 in stage 16.0 (TID > 138, 10.160.122.226, executor 0, partition 2, PROCESS_LOCAL, 9098 bytes) > 18/07/02 13:52:39 INFO TaskSetManager: Starting task 3.0 in stage 16.0 (TID > 139, 10.160.122.226, executor 0, partition 3, PROCESS_LOCAL, 9098 bytes) > 18/07/02 13:52:39 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory > on 10.160.122.226:38011 (size: 93.0 KB, free: 4.5 GB) > 18/07/02 13:52:39 INFO TaskSetManager: Finished task 0.0 in stage 16.0 (TID > 136) in 47 ms on 10.160.122.226 (executor 0) (1/4) > 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_2 in memory on > 10.160.122.226:38011 (size: 38.8 MB, free: 4.5 GB) > 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_3 in memory on > 10.160.122.226:38011 (size: 38.7 MB, free: 4.4 GB) > 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_1 in memory on > 10.160.122.226:38011 (size: 38.8 MB, free: 4.4 GB) > 18/07/02 13:52:49 INFO BlockManagerInfo: Added rdd_63_2 in memory on > 10.160.122.226:38011 (size: 38.8 MB, free: 4.3 GB) > 18/07/02 13:52:49 INFO TaskSetManager: Finished task 2.0 in stage 16.0 (TID > 138) in 10368 ms on 10.160.122.226 (executor 0) (2/4) > 18/07/02 13:52:50 INFO BlockManagerInfo: Added rdd_63_3 in memory on > 10.160.122.226:38011 (size: 38.7 MB, free: 4.3 GB) > 18/07/02 13:52:50 INFO TaskSetManager: Finished task 3.0 in stage 16.0 (TID > 139) in 10617 ms on 10.160.122.226 (executor 0) (3/4) > 18/07/02 13:52:50 INFO BlockManagerInfo: Added rdd_63_1 in memory on > 10.160.122.226:38011 (size: 38.8 MB, free: 4.3 GB) > 18/07/02 13:52:50 INFO TaskSetManager: Finished task 1.0 in stage 16.0 (TID > 137) in 10668 ms on 10.160.122.226 (executor 0) (4/4) > 18/07/02 13:52:50 INFO TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks > have all completed, from pool > 18/07/02 13:52:50 INFO DAGScheduler: ResultStage 16 (run at > ThreadPoolExecutor.java:1149) finished in 10.669 s > 18/07/02 13:52:50 INFO DAGScheduler: Job 9 finished: run at > ThreadPoolExecutor.java:1149, took 10.684407 s > 18/07/02 13:52:50 INFO CodeGenerator: Code generated in 7.746892 ms > 18/07/02 13:52:50 INFO MemoryStore: Block broadcast_22 stored as values in > memory (estimated size 19.0 MB, free 892.2 MB) > 18/07/02 13:52:50 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes > in memory (estimated size 3.2 MB, free 889.0 MB) > 18/07/02 13:52:50 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory > on 10.160.123.242:38105 (size: 3.2 MB, free: 909.0 MB) > 18/07/02 13:52:50 INFO SparkContext: Created broadcast 22 from run at > ThreadPoolExecutor.java:1149 > 18/07/02 13:52:50 INFO BlockManagerInfo: Removed broadcast_21_piece0 on > 10.160.123.242:38105 in memory (size: 93.0 KB, free: 909.0 MB) > 18/07/02 13:52:50 INFO BlockManagerInfo: Removed broadcast_21_piece0 on > 10.160.122.226:38011 in memory (size: 93.0 KB, free: 4.3 GB) > Exception in thread "main" java.lang.StackOverflowError > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373) > at > scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101) > at > scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101) > at > scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:92) > at > scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:90) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.runtime.Tuple2Zipped$.exists$extension(Tuple2Zipped.scala:90) > at scala.runtime.Tuple2Zipped$.forall$extension(Tuple2Zipped.scala:101) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:373) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373) > at > scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101) > at > scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101) > at > scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:92) > at > scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:90) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.runtime.Tuple2Zipped$.exists$extension(Tuple2Zipped.scala:90) > at scala.runtime.Tuple2Zipped$.forall$extension(Tuple2Zipped.scala:101) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:373) > > {code} > > Code ran: > > {code:java} > object Upsert { > val logger = Logger.getLogger(getClass.getName) > def finalDf(srcDf: DataFrame, partitionPath: Option[String], hiveSchema: > StructType, pkList: List[String], srcSchema: StructType) = { > logger.info(s"""=====Joining the source file and previous hive > partition=====""") > //val hiveCols = srcSchema.map(f => col(f.name)) > val srcCols = srcSchema.map(f => col("_" + f.name)) > val finalColsType = srcSchema.map(f => > if(f.dataType.simpleString.contains("decimal")) (f.name, DecimalType(31,8)) > else (f.name, f.dataType) > ) > val finalCols = finalColsType.map(_._1) > val srcPkList = pkList.map("_" + _) > val hivedf = extract.Data.readHivePartition(sparkSession, partitionPath, > hiveSchema).cache() > val hiveCols = hivedf.dtypes.toList.map(n => (n._1, > stringToStructTypeMapping(n._2))) > val addedCols = finalColsType.toList.diff(hiveCols) > val hivedfNew = addMultipleColToDF(hivedf, > addedCols).select(finalCols.map(col(_)):_*).cache() > val commonDataFilterCond = srcPkList > .zip(pkList) > .map{case(c1, c2) => (coalesce(col(c1), lit("null")) === coalesce(col(c2), > lit("null")))} > .reduce(_ && _) > isDfEmpty(hivedfNew) match { > case true => srcDf > case false => { > val srcRename = srcDf.toDF(srcSchema.map("_" + _.name):_*) > val joinData = srcRename.join(hivedfNew, commonDataFilterCond, "inner") > val commonData = joinData.select(srcCols:_*) > val currentData = srcRename.except(commonData).cache > val prevData = hivedfNew.except(joinData.select(finalCols.map(col(_)):_*)) > currentData.unionAll(prevData).unionAll(commonData).toDF(finalCols:_*) > } > } > } > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org