[ https://issues.apache.org/jira/browse/SPARK-5775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust reassigned SPARK-5775: --------------------------------------- Assignee: Michael Armbrust > GenericRow cannot be cast to SpecificMutableRow when nested data and > partitioned table > -------------------------------------------------------------------------------------- > > Key: SPARK-5775 > URL: https://issues.apache.org/jira/browse/SPARK-5775 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.2.1 > Reporter: Ayoub Benali > Assignee: Michael Armbrust > Labels: hivecontext, nested, parquet, partition > > Using the "LOAD" sql command in Hive context to load parquet files into a > partitioned table causes exceptions during query time. > The bug requires the table to have a column of *type Array of struct* and to > be *partitioned*. > The example bellow shows how to reproduce the bug and you can see that if the > table is not partitioned the query works fine. > {noformat} > scala> val data1 = """{"data_array":[{"field1":1,"field2":2}]}""" > scala> val data2 = """{"data_array":[{"field1":3,"field2":4}]}""" > scala> val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil) > scala> val schemaRDD = hiveContext.jsonRDD(jsonRDD) > scala> schemaRDD.printSchema > root > |-- data_array: array (nullable = true) > | |-- element: struct (containsNull = false) > | | |-- field1: integer (nullable = true) > | | |-- field2: integer (nullable = true) > scala> hiveContext.sql("create external table if not exists > partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) > Partitioned by (date STRING) STORED AS PARQUET Location > 'hdfs://****/partitioned_table'") > scala> hiveContext.sql("create external table if not exists > none_partitioned_table(data_array ARRAY <STRUCT<field1: INT, field2: INT>>) > STORED AS PARQUET Location 'hdfs://****/none_partitioned_table'") > scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_1") > scala> schemaRDD.saveAsParquetFile("hdfs://****/tmp_data_2") > scala> hiveContext.sql("LOAD DATA INPATH > 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1' INTO TABLE > partitioned_table PARTITION(date='2015-02-12')") > scala> hiveContext.sql("LOAD DATA INPATH > 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2' INTO TABLE > none_partitioned_table") > scala> hiveContext.sql("select data.field1 from none_partitioned_table > LATERAL VIEW explode(data_array) nestedStuff AS data").collect > res23: Array[org.apache.spark.sql.Row] = Array([1], [3]) > scala> hiveContext.sql("select data.field1 from partitioned_table LATERAL > VIEW explode(data_array) nestedStuff AS data").collect > 15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from > partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data > 15/02/12 16:21:03 INFO ParseDriver: Parse Completed > 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with > curMem=0, maxMem=280248975 > 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in > memory (estimated size 254.6 KB, free 267.0 MB) > 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with > curMem=260661, maxMem=280248975 > 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes > in memory (estimated size 27.9 KB, free 267.0 MB) > 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory > on *****:51990 (size: 27.9 KB, free: 267.2 MB) > 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block > broadcast_18_piece0 > 15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD > at ParquetTableOperations.scala:119 > 15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3 > 15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3 > 15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side > Metadata Split Strategy > 15/02/12 16:21:03 INFO SparkContext: Starting job: collect at > SparkPlan.scala:84 > 15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at > SparkPlan.scala:84) with 3 output partitions (allowLocal=false) > 15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at > SparkPlan.scala:84) > 15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List() > 15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List() > 15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at > map at SparkPlan.scala:84), which has no missing parents > 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with > curMem=289276, maxMem=280248975 > 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19 stored as values in > memory (estimated size 7.5 KB, free 267.0 MB) > 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(4230) called with > curMem=296908, maxMem=280248975 > 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes > in memory (estimated size 4.1 KB, free 267.0 MB) > 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory > on *****:51990 (size: 4.1 KB, free: 267.2 MB) > 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block > broadcast_19_piece0 > 15/02/12 16:21:03 INFO SparkContext: Created broadcast 19 from broadcast at > DAGScheduler.scala:838 > 15/02/12 16:21:03 INFO DAGScheduler: Submitting 3 missing tasks from Stage 13 > (MappedRDD[111] at map at SparkPlan.scala:84) > 15/02/12 16:21:03 INFO TaskSchedulerImpl: Adding task set 13.0 with 3 tasks > 15/02/12 16:21:03 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID > 48, *****, NODE_LOCAL, 1640 bytes) > 15/02/12 16:21:03 INFO TaskSetManager: Starting task 1.0 in stage 13.0 (TID > 49, *****, NODE_LOCAL, 1641 bytes) > 15/02/12 16:21:03 INFO TaskSetManager: Starting task 2.0 in stage 13.0 (TID > 50, *****, NODE_LOCAL, 1640 bytes) > 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory > on *****:39729 (size: 4.1 KB, free: 133.6 MB) > 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory > on *****:48213 (size: 4.1 KB, free: 133.6 MB) > 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory > on *****:45394 (size: 4.1 KB, free: 133.6 MB) > 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory > on *****:39729 (size: 27.9 KB, free: 133.6 MB) > 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory > on *****:48213 (size: 27.9 KB, free: 133.6 MB) > 15/02/12 16:21:04 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory > on *****:45394 (size: 27.9 KB, free: 133.6 MB) > 15/02/12 16:21:04 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 48, > *****): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147) > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.1 in stage 13.0 (TID > 51, *****, NODE_LOCAL, 1640 bytes) > 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.0 in stage 13.0 (TID 49) > on executor *****: java.lang.ClassCastException > (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 1] > 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.1 in stage 13.0 (TID > 52, *****, NODE_LOCAL, 1641 bytes) > 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.1 in stage 13.0 (TID 51) > on executor *****: java.lang.ClassCastException > (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 2] > 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.2 in stage 13.0 (TID > 53, *****, NODE_LOCAL, 1640 bytes) > 15/02/12 16:21:04 INFO TaskSetManager: Finished task 2.0 in stage 13.0 (TID > 50) in 405 ms on ***** (1/3) > 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.1 in stage 13.0 (TID 52) > on executor *****: java.lang.ClassCastException > (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 3] > 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.2 in stage 13.0 (TID > 54, *****, NODE_LOCAL, 1641 bytes) > 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.2 in stage 13.0 (TID 53) > on executor *****: java.lang.ClassCastException > (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 4] > 15/02/12 16:21:04 INFO TaskSetManager: Starting task 0.3 in stage 13.0 (TID > 55, *****, NODE_LOCAL, 1640 bytes) > 15/02/12 16:21:04 INFO TaskSetManager: Lost task 1.2 in stage 13.0 (TID 54) > on executor *****: java.lang.ClassCastException > (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 5] > 15/02/12 16:21:04 INFO TaskSetManager: Starting task 1.3 in stage 13.0 (TID > 56, *****, NODE_LOCAL, 1641 bytes) > 15/02/12 16:21:04 INFO TaskSetManager: Lost task 0.3 in stage 13.0 (TID 55) > on executor *****: java.lang.ClassCastException > (org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) [duplicate 6] > 15/02/12 16:21:04 ERROR TaskSetManager: Task 0 in stage 13.0 failed 4 times; > aborting job > 15/02/12 16:21:04 INFO TaskSchedulerImpl: Cancelling stage 13 > 15/02/12 16:21:04 INFO TaskSchedulerImpl: Stage 13 was cancelled > 15/02/12 16:21:04 INFO DAGScheduler: Job 12 failed: collect at > SparkPlan.scala:84, took 0.556942 s > 15/02/12 16:21:04 WARN TaskSetManager: Lost task 1.3 in stage 13.0 (TID 56, > *****): TaskKilled (killed intentionally) > 15/02/12 16:21:04 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks > have all completed, from pool > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 > (TID 55, *****): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to > org.apache.spark.sql.catalyst.expressions.SpecificMutableRow > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:147) > at > org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anon$1.next(ParquetTableOperations.scala:144) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org