[ 
https://issues.apache.org/jira/browse/SPARK-24729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24729.
----------------------------------
    Resolution: Incomplete

> Spark - stackoverflow error - org.apache.spark.sql.catalyst.plans.QueryPlan
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-24729
>                 URL: https://issues.apache.org/jira/browse/SPARK-24729
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.1.1
>            Reporter: t oo
>            Priority: Major
>              Labels: bulk-closed
>
> Ran a spark (v2.1.1) job that joins 2 rdds (one is .txt file from S3, another 
> is parquet from S3) the job then merges the dataset (ie get latest row per 
> PK, if PK exists in txt and parquet then take the row from the .txt) and 
> writes out a new parquet to S3. Got this error but upon re-running it worked 
> fine. Both the .txt and parquet have 302 columns. The .txt has 191 rows, the 
> parquet has 156300 rows. Does anyone know the cause?
>  
> {code:java}
>  
> 18/07/02 13:51:56 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 
> 134, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 6337 bytes)
> 18/07/02 13:51:56 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory 
> on 10.160.122.226:38011 (size: 27.2 KB, free: 4.6 GB)
> 18/07/02 13:51:56 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 
> 134) in 295 ms on 10.160.122.226 (executor 0) (1/1)
> 18/07/02 13:51:56 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks 
> have all completed, from pool
> 18/07/02 13:51:56 INFO DAGScheduler: ResultStage 14 (load at Data.scala:25) 
> finished in 0.295 s
> 18/07/02 13:51:56 INFO DAGScheduler: Job 7 finished: load at Data.scala:25, 
> took 0.310932 s
> 18/07/02 13:51:57 INFO FileSourceStrategy: Pruning directories with:
> 18/07/02 13:51:57 INFO FileSourceStrategy: Post-Scan Filters:
> 18/07/02 13:51:57 INFO FileSourceStrategy: Output Data Schema: struct<row_id: 
> string, created: timestamp, created_by: string, last_upd: timestamp, 
> last_upd_by: string ... 300 more fields>
> 18/07/02 13:51:57 INFO FileSourceStrategy: Pushed Filters:
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_19 stored as values in 
> memory (estimated size 387.2 KB, free 911.2 MB)
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes 
> in memory (estimated size 33.7 KB, free 911.1 MB)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory 
> on 10.160.123.242:38105 (size: 33.7 KB, free: 912.2 MB)
> 18/07/02 13:51:57 INFO SparkContext: Created broadcast 19 from cache at 
> Upsert.scala:25
> 18/07/02 13:51:57 INFO FileSourceScanExec: Planning scan with bin packing, 
> max size: 48443541 bytes, open cost is considered as scanning 4194304 bytes.
> 18/07/02 13:51:57 INFO SparkContext: Starting job: take at Utils.scala:28
> 18/07/02 13:51:57 INFO DAGScheduler: Got job 8 (take at Utils.scala:28) with 
> 1 output partitions
> 18/07/02 13:51:57 INFO DAGScheduler: Final stage: ResultStage 15 (take at 
> Utils.scala:28)
> 18/07/02 13:51:57 INFO DAGScheduler: Parents of final stage: List()
> 18/07/02 13:51:57 INFO DAGScheduler: Missing parents: List()
> 18/07/02 13:51:57 INFO DAGScheduler: Submitting ResultStage 15 
> (MapPartitionsRDD[65] at take at Utils.scala:28), which has no missing parents
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_20 stored as values in 
> memory (estimated size 321.5 KB, free 910.8 MB)
> 18/07/02 13:51:57 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes 
> in memory (estimated size 93.0 KB, free 910.7 MB)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory 
> on 10.160.123.242:38105 (size: 93.0 KB, free: 912.1 MB)
> 18/07/02 13:51:57 INFO SparkContext: Created broadcast 20 from broadcast at 
> DAGScheduler.scala:996
> 18/07/02 13:51:57 INFO DAGScheduler: Submitting 1 missing tasks from 
> ResultStage 15 (MapPartitionsRDD[65] at take at Utils.scala:28)
> 18/07/02 13:51:57 INFO TaskSchedulerImpl: Adding task set 15.0 with 1 tasks
> 18/07/02 13:51:57 INFO TaskSetManager: Starting task 0.0 in stage 15.0 (TID 
> 135, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 9035 bytes)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory 
> on 10.160.122.226:38011 (size: 93.0 KB, free: 4.6 GB)
> 18/07/02 13:51:57 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory 
> on 10.160.122.226:38011 (size: 33.7 KB, free: 4.6 GB)
> 18/07/02 13:52:05 INFO BlockManagerInfo: Added rdd_61_0 in memory on 
> 10.160.122.226:38011 (size: 38.9 MB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Added rdd_63_0 in memory on 
> 10.160.122.226:38011 (size: 38.9 MB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO TaskSetManager: Finished task 0.0 in stage 15.0 (TID 
> 135) in 11751 ms on 10.160.122.226 (executor 0) (1/1)
> 18/07/02 13:52:09 INFO TaskSchedulerImpl: Removed TaskSet 15.0, whose tasks 
> have all completed, from pool
> 18/07/02 13:52:09 INFO DAGScheduler: ResultStage 15 (take at Utils.scala:28) 
> finished in 11.751 s
> 18/07/02 13:52:09 INFO DAGScheduler: Job 8 finished: take at Utils.scala:28, 
> took 11.772561 s
> 18/07/02 13:52:09 INFO CodeGenerator: Code generated in 185.277258 ms
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3459
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3452
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3456
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3455
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3458
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3450
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3460
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3449
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_18_piece0 on 
> 10.160.123.242:38105 in memory (size: 27.2 KB, free: 912.1 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_18_piece0 on 
> 10.160.122.226:38011 in memory (size: 27.2 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3462
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_17_piece0 on 
> 10.160.123.242:38105 in memory (size: 3.7 KB, free: 912.1 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_17_piece0 on 
> 10.160.122.226:38011 in memory (size: 3.7 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3451
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3684
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_15_piece0 on 
> 10.160.123.242:38105 in memory (size: 25.6 KB, free: 912.1 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_15_piece0 on 
> 10.160.122.226:38011 in memory (size: 25.6 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3453
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3457
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 
> 10.160.123.242:38105 in memory (size: 4.9 KB, free: 912.2 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 
> 10.160.122.226:38011 in memory (size: 4.9 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned shuffle 6
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3461
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_20_piece0 on 
> 10.160.123.242:38105 in memory (size: 93.0 KB, free: 912.2 MB)
> 18/07/02 13:52:09 INFO BlockManagerInfo: Removed broadcast_20_piece0 on 
> 10.160.122.226:38011 in memory (size: 93.0 KB, free: 4.5 GB)
> 18/07/02 13:52:09 INFO ContextCleaner: Cleaned accumulator 3454
> 18/07/02 13:52:39 INFO SparkContext: Starting job: run at 
> ThreadPoolExecutor.java:1149
> 18/07/02 13:52:39 INFO DAGScheduler: Got job 9 (run at 
> ThreadPoolExecutor.java:1149) with 4 output partitions
> 18/07/02 13:52:39 INFO DAGScheduler: Final stage: ResultStage 16 (run at 
> ThreadPoolExecutor.java:1149)
> 18/07/02 13:52:39 INFO DAGScheduler: Parents of final stage: List()
> 18/07/02 13:52:39 INFO DAGScheduler: Missing parents: List()
> 18/07/02 13:52:39 INFO DAGScheduler: Submitting ResultStage 16 
> (MapPartitionsRDD[67] at run at ThreadPoolExecutor.java:1149), which has no 
> missing parents
> 18/07/02 13:52:39 INFO MemoryStore: Block broadcast_21 stored as values in 
> memory (estimated size 321.7 KB, free 911.3 MB)
> 18/07/02 13:52:39 INFO MemoryStore: Block broadcast_21_piece0 stored as bytes 
> in memory (estimated size 93.0 KB, free 911.2 MB)
> 18/07/02 13:52:39 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory 
> on 10.160.123.242:38105 (size: 93.0 KB, free: 912.2 MB)
> 18/07/02 13:52:39 INFO SparkContext: Created broadcast 21 from broadcast at 
> DAGScheduler.scala:996
> 18/07/02 13:52:39 INFO DAGScheduler: Submitting 4 missing tasks from 
> ResultStage 16 (MapPartitionsRDD[67] at run at ThreadPoolExecutor.java:1149)
> 18/07/02 13:52:39 INFO TaskSchedulerImpl: Adding task set 16.0 with 4 tasks
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 0.0 in stage 16.0 (TID 
> 136, 10.160.122.226, executor 0, partition 0, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 1.0 in stage 16.0 (TID 
> 137, 10.160.122.226, executor 0, partition 1, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 2.0 in stage 16.0 (TID 
> 138, 10.160.122.226, executor 0, partition 2, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO TaskSetManager: Starting task 3.0 in stage 16.0 (TID 
> 139, 10.160.122.226, executor 0, partition 3, PROCESS_LOCAL, 9098 bytes)
> 18/07/02 13:52:39 INFO BlockManagerInfo: Added broadcast_21_piece0 in memory 
> on 10.160.122.226:38011 (size: 93.0 KB, free: 4.5 GB)
> 18/07/02 13:52:39 INFO TaskSetManager: Finished task 0.0 in stage 16.0 (TID 
> 136) in 47 ms on 10.160.122.226 (executor 0) (1/4)
> 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_2 in memory on 
> 10.160.122.226:38011 (size: 38.8 MB, free: 4.5 GB)
> 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_3 in memory on 
> 10.160.122.226:38011 (size: 38.7 MB, free: 4.4 GB)
> 18/07/02 13:52:46 INFO BlockManagerInfo: Added rdd_61_1 in memory on 
> 10.160.122.226:38011 (size: 38.8 MB, free: 4.4 GB)
> 18/07/02 13:52:49 INFO BlockManagerInfo: Added rdd_63_2 in memory on 
> 10.160.122.226:38011 (size: 38.8 MB, free: 4.3 GB)
> 18/07/02 13:52:49 INFO TaskSetManager: Finished task 2.0 in stage 16.0 (TID 
> 138) in 10368 ms on 10.160.122.226 (executor 0) (2/4)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Added rdd_63_3 in memory on 
> 10.160.122.226:38011 (size: 38.7 MB, free: 4.3 GB)
> 18/07/02 13:52:50 INFO TaskSetManager: Finished task 3.0 in stage 16.0 (TID 
> 139) in 10617 ms on 10.160.122.226 (executor 0) (3/4)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Added rdd_63_1 in memory on 
> 10.160.122.226:38011 (size: 38.8 MB, free: 4.3 GB)
> 18/07/02 13:52:50 INFO TaskSetManager: Finished task 1.0 in stage 16.0 (TID 
> 137) in 10668 ms on 10.160.122.226 (executor 0) (4/4)
> 18/07/02 13:52:50 INFO TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks 
> have all completed, from pool
> 18/07/02 13:52:50 INFO DAGScheduler: ResultStage 16 (run at 
> ThreadPoolExecutor.java:1149) finished in 10.669 s
> 18/07/02 13:52:50 INFO DAGScheduler: Job 9 finished: run at 
> ThreadPoolExecutor.java:1149, took 10.684407 s
> 18/07/02 13:52:50 INFO CodeGenerator: Code generated in 7.746892 ms
> 18/07/02 13:52:50 INFO MemoryStore: Block broadcast_22 stored as values in 
> memory (estimated size 19.0 MB, free 892.2 MB)
> 18/07/02 13:52:50 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes 
> in memory (estimated size 3.2 MB, free 889.0 MB)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory 
> on 10.160.123.242:38105 (size: 3.2 MB, free: 909.0 MB)
> 18/07/02 13:52:50 INFO SparkContext: Created broadcast 22 from run at 
> ThreadPoolExecutor.java:1149
> 18/07/02 13:52:50 INFO BlockManagerInfo: Removed broadcast_21_piece0 on 
> 10.160.123.242:38105 in memory (size: 93.0 KB, free: 909.0 MB)
> 18/07/02 13:52:50 INFO BlockManagerInfo: Removed broadcast_21_piece0 on 
> 10.160.122.226:38011 in memory (size: 93.0 KB, free: 4.3 GB)
> Exception in thread "main" java.lang.StackOverflowError
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:92)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:90)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.runtime.Tuple2Zipped$.exists$extension(Tuple2Zipped.scala:90)
>  at scala.runtime.Tuple2Zipped$.forall$extension(Tuple2Zipped.scala:101)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:373)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$sameResult$1.apply(QueryPlan.scala:373)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$forall$extension$1.apply(Tuple2Zipped.scala:101)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:92)
>  at 
> scala.runtime.Tuple2Zipped$$anonfun$exists$extension$1.apply(Tuple2Zipped.scala:90)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.runtime.Tuple2Zipped$.exists$extension(Tuple2Zipped.scala:90)
>  at scala.runtime.Tuple2Zipped$.forall$extension(Tuple2Zipped.scala:101)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:373)
>  
> {code}
>  
> Code ran:
>  
> {code:java}
> object Upsert {
>  val logger = Logger.getLogger(getClass.getName)
> def finalDf(srcDf: DataFrame, partitionPath: Option[String], hiveSchema: 
> StructType, pkList: List[String], srcSchema: StructType) = {
>  logger.info(s"""=====Joining the source file and previous hive 
> partition=====""")
>  //val hiveCols = srcSchema.map(f => col(f.name))
>  val srcCols = srcSchema.map(f => col("_" + f.name))
>  val finalColsType = srcSchema.map(f =>
>  if(f.dataType.simpleString.contains("decimal")) (f.name, DecimalType(31,8)) 
>  else (f.name, f.dataType)
>  )
>  val finalCols = finalColsType.map(_._1)
>  val srcPkList = pkList.map("_" + _)
>  val hivedf = extract.Data.readHivePartition(sparkSession, partitionPath, 
> hiveSchema).cache()
>  val hiveCols = hivedf.dtypes.toList.map(n => (n._1, 
> stringToStructTypeMapping(n._2)))
>  val addedCols = finalColsType.toList.diff(hiveCols)
>  val hivedfNew = addMultipleColToDF(hivedf, 
> addedCols).select(finalCols.map(col(_)):_*).cache()
>  val commonDataFilterCond = srcPkList
>  .zip(pkList)
>  .map{case(c1, c2) => (coalesce(col(c1), lit("null")) === coalesce(col(c2), 
> lit("null")))}
>  .reduce(_ && _)
> isDfEmpty(hivedfNew) match {
>  case true => srcDf
>  case false => {
>  val srcRename = srcDf.toDF(srcSchema.map("_" + _.name):_*)
>  val joinData = srcRename.join(hivedfNew, commonDataFilterCond, "inner")
>  val commonData = joinData.select(srcCols:_*)
>  val currentData = srcRename.except(commonData).cache
>  val prevData = hivedfNew.except(joinData.select(finalCols.map(col(_)):_*))
>  currentData.unionAll(prevData).unionAll(commonData).toDF(finalCols:_*)
>  }
>  }
>  }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to