[jira] [Commented] (SPARK-5775) GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
[ https://issues.apache.org/jira/browse/SPARK-5775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14336152#comment-14336152 ] Anselme Vignon commented on SPARK-5775: --- [~marmbrus][~lian cheng] Hi, I'm quite new on the process of debugging spark, but the pull request I updated 5 days ago (referenced above) seems to be solving this issue. Or did I miss something ? Cheers (and thanks for the awesome work), Anselme GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table -- Key: SPARK-5775 URL: https://issues.apache.org/jira/browse/SPARK-5775 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Ayoub Benali Assignee: Cheng Lian Priority: Blocker Labels: hivecontext, nested, parquet, partition Using the LOAD sql command in Hive context to load parquet files into a partitioned table causes exceptions during query time. The bug requires the table to have a column of *type Array of struct* and to be *partitioned*. The example bellow shows how to reproduce the bug and you can see that if the table is not partitioned the query works fine. {noformat} scala val data1 = {data_array:[{field1:1,field2:2}]} scala val data2 = {data_array:[{field1:3,field2:4}]} scala val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil) scala val schemaRDD = hiveContext.jsonRDD(jsonRDD) scala schemaRDD.printSchema root |-- data_array: array (nullable = true) ||-- element: struct (containsNull = false) |||-- field1: integer (nullable = true) |||-- field2: integer (nullable = true) scala hiveContext.sql(create external table if not exists partitioned_table(data_array ARRAY STRUCTfield1: INT, field2: INT) Partitioned by (date STRING) STORED AS PARQUET Location 'hdfs:///partitioned_table') scala hiveContext.sql(create external table if not exists none_partitioned_table(data_array ARRAY STRUCTfield1: INT, field2: INT) STORED AS PARQUET Location 'hdfs:///none_partitioned_table') scala schemaRDD.saveAsParquetFile(hdfs:///tmp_data_1) scala schemaRDD.saveAsParquetFile(hdfs:///tmp_data_2) scala hiveContext.sql(LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1' INTO TABLE partitioned_table PARTITION(date='2015-02-12')) scala hiveContext.sql(LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2' INTO TABLE none_partitioned_table) scala hiveContext.sql(select data.field1 from none_partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data).collect res23: Array[org.apache.spark.sql.Row] = Array([1], [3]) scala hiveContext.sql(select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data).collect 15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data 15/02/12 16:21:03 INFO ParseDriver: Parse Completed 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with curMem=0, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 254.6 KB, free 267.0 MB) 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with curMem=260661, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 27.9 KB, free 267.0 MB) 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *:51990 (size: 27.9 KB, free: 267.2 MB) 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0 15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD at ParquetTableOperations.scala:119 15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3 15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3 15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side Metadata Split Strategy 15/02/12 16:21:03 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at SparkPlan.scala:84) with 3 output partitions (allowLocal=false) 15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at SparkPlan.scala:84) 15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List() 15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List() 15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84), which has no missing parents 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with curMem=289276, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block
[jira] [Commented] (SPARK-5775) GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table
[ https://issues.apache.org/jira/browse/SPARK-5775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327938#comment-14327938 ] Anselme Vignon commented on SPARK-5775: --- This bug is due to a problem in the TableScanOperations, involving indeed partition columns and complex type columns. I made a pull request patching up the issue here : https://github.com/apache/spark/pull/4697 GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table -- Key: SPARK-5775 URL: https://issues.apache.org/jira/browse/SPARK-5775 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Ayoub Benali Labels: hivecontext, nested, parquet, partition Using the LOAD sql command in Hive context to load parquet files into a partitioned table causes exceptions during query time. The bug requires the table to have a column of *type Array of struct* and to be *partitioned*. The example bellow shows how to reproduce the bug and you can see that if the table is not partitioned the query works fine. {noformat} scala val data1 = {data_array:[{field1:1,field2:2}]} scala val data2 = {data_array:[{field1:3,field2:4}]} scala val jsonRDD = sc.makeRDD(data1 :: data2 :: Nil) scala val schemaRDD = hiveContext.jsonRDD(jsonRDD) scala schemaRDD.printSchema root |-- data_array: array (nullable = true) ||-- element: struct (containsNull = false) |||-- field1: integer (nullable = true) |||-- field2: integer (nullable = true) scala hiveContext.sql(create external table if not exists partitioned_table(data_array ARRAY STRUCTfield1: INT, field2: INT) Partitioned by (date STRING) STORED AS PARQUET Location 'hdfs:///partitioned_table') scala hiveContext.sql(create external table if not exists none_partitioned_table(data_array ARRAY STRUCTfield1: INT, field2: INT) STORED AS PARQUET Location 'hdfs:///none_partitioned_table') scala schemaRDD.saveAsParquetFile(hdfs:///tmp_data_1) scala schemaRDD.saveAsParquetFile(hdfs:///tmp_data_2) scala hiveContext.sql(LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_1' INTO TABLE partitioned_table PARTITION(date='2015-02-12')) scala hiveContext.sql(LOAD DATA INPATH 'hdfs://qa-hdc001.ffm.nugg.ad:8020/erlogd/tmp_data_2' INTO TABLE none_partitioned_table) scala hiveContext.sql(select data.field1 from none_partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data).collect res23: Array[org.apache.spark.sql.Row] = Array([1], [3]) scala hiveContext.sql(select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data).collect 15/02/12 16:21:03 INFO ParseDriver: Parsing command: select data.field1 from partitioned_table LATERAL VIEW explode(data_array) nestedStuff AS data 15/02/12 16:21:03 INFO ParseDriver: Parse Completed 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(260661) called with curMem=0, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 254.6 KB, free 267.0 MB) 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(28615) called with curMem=260661, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 27.9 KB, free 267.0 MB) 15/02/12 16:21:03 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on *:51990 (size: 27.9 KB, free: 267.2 MB) 15/02/12 16:21:03 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0 15/02/12 16:21:03 INFO SparkContext: Created broadcast 18 from NewHadoopRDD at ParquetTableOperations.scala:119 15/02/12 16:21:03 INFO FileInputFormat: Total input paths to process : 3 15/02/12 16:21:03 INFO ParquetInputFormat: Total input paths to process : 3 15/02/12 16:21:03 INFO FilteringParquetRowInputFormat: Using Task Side Metadata Split Strategy 15/02/12 16:21:03 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/02/12 16:21:03 INFO DAGScheduler: Got job 12 (collect at SparkPlan.scala:84) with 3 output partitions (allowLocal=false) 15/02/12 16:21:03 INFO DAGScheduler: Final stage: Stage 13(collect at SparkPlan.scala:84) 15/02/12 16:21:03 INFO DAGScheduler: Parents of final stage: List() 15/02/12 16:21:03 INFO DAGScheduler: Missing parents: List() 15/02/12 16:21:03 INFO DAGScheduler: Submitting Stage 13 (MappedRDD[111] at map at SparkPlan.scala:84), which has no missing parents 15/02/12 16:21:03 INFO MemoryStore: ensureFreeSpace(7632) called with curMem=289276, maxMem=280248975 15/02/12 16:21:03 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 7.5 KB, free 267.0 MB) 15/02/12 16:21:03 INFO