[ https://issues.apache.org/jira/browse/SPARK-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15610900#comment-15610900 ]
Raul Saez Tapia edited comment on SPARK-14927 at 10/27/16 7:36 AM: ------------------------------------------------------------------- [~xwu0226] for me is working fine your example with Spark 1.6.1. However it is not working when we use UDT. My DataFrame shows: {code} scala> model_date.toDF.show +--------+--------------------+ | date| model| +--------+--------------------+ |20160610|[aa.bb.spark.types.PersonWrapper@8542...| |20160610|[aa.bb.spark.types.PersonWrapper@8831......| ... ... +--------+--------------------+ {code} I have created the table with some specific properties so I can say how is defined the table and how to parse from PersonType UDT to table schema: {code:sql} create table model_orc (`model` struct<personWrapper:struct<id:int,name:string>>) PARTITIONED BY (`date` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ('path'='hdfs:///user/raulsaez/model_orc') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs:///user/raulsaez/model_orc' TBLPROPERTIES('spark.sql.sources.schema.numParts'='1','spark.sql.sources.schema.part.0'='{ \"type\":\"struct\",\"fields\":[{ \"name\":\"personWrapper\",\"type\":{ \"type\":\"udt\",\"class\":\"aa.bb.spark.types.PersonType\",\"pyClass\":null,\"sqlType\":{ \"type\":\"struct\",\"fields\":[{ \"name\":\"id\",\"type\": \"integer\",\"nullable\":true,\"metadata\":{} } ,{ \"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{} }] } },\"nullable\":true,\"metadata\":{} }] }') {code} Now we insert data into table: {code} scala> hiveContext.sql("insert into model_orc partition(date=20160610) select model,date from dfJune") org.apache.spark.sql.AnalysisException: cannot resolve 'cast(model as struct<person:struct<personWrapper:struct<id:int,name:string>>>)' due to data type mismatch: cannot cast StructType(StructField(personWrapper,,true)),true) to StructType(StructField(person,StructType(StructField(id,IntegerType,true),StructField(name,StringType,true),),true) {code} I have the same issue with both Parquet and ORC. And if I persist the DataFrame as a table with ORC: {code} model_date.toDF.write.format("orc").partitionBy("date").saveAsTable("model_orc_asTable") {code} Or even if I persist it as a ORC file: {code} scala> model_date.toDF.write.mode(SaveMode.Append).format("orc").partitionBy("date").save("model_orc") {code} I get the ClassCastException: {code} Caused by: java.lang.ClassCastException: aa.bb.spark.types.PersonType cannot be cast to org.apache.spark.sql.types.StructType at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:557) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:590) at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:589) at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135) at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:589) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrapOrcStruct(OrcRelation.scala:128) at org.apache.spark.sql.hive.orc.OrcOutputWriter.writeInternal(OrcRelation.scala:139) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:358) ... 8 more {code} If I persist the DataFrame as a table with Parquet: {code} scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").saveAsTable("model_parquet_asTable") 16/10/27 09:39:24 WARN HiveContext$$anon$2: Persisting partitioned data source relation `model_parquet_asTable` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input path(s): hdfs://dev-nameservice/apps/hive/warehouse/model_parquet_astable ... ... ... scala> hiveContext.sql("select * from model_parquet_asTable where date=20160610").show +--------------------+--------+ | model| date| +--------------------+--------+ |[aa.bb.spark.types.PersonWrapper@8542...|20160610| |[aa.bb.spark.types.PersonWrapper@8831...|20160610| |[aa.bb.spark.types.PersonWrapper@3661...|20160610| ... ... ... +--------------------+--------+ only showing top 20 rows {code} >From Hive I can see the table though it looks empty (I know that Spark told me >it was stored in a Spark SQL specific format): {code} 0: jdbc:hive2://imp1tvhdpedg1.corp.du.ae:1000> select * from model_parquet_astable; +----------------------------+--+ | model_parquet_astable.col | +----------------------------+--+ +----------------------------+--+ No rows selected (0.057 seconds) {code} And if I persist the DataFrame as a Parquet file it is working fine: {code} scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").save("model_parquet_asTable") ... ... $ hdfs dfs -ls -R drwx------ - raulsaez raulsaez 0 2016-10-27 09:27 model_parquet_asTable/date=20160610 -rw------- 3 raulsaez raulsaez 36725 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00000-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 34638 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00001-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 38752 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00002-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 27702 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00003-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 41743 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00004-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 35128 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00005-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 40996 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00006-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 29046 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00007-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet {code} It would be very interesting to have a way for persisting a DataFrame with UDT types as a table with partitions that is also functional in Hive. was (Author: raul saez): [~xwu0226] for me is working fine your example with Spark 1.6.1. However it is not working when we use UDT. My DataFrame shows: {code:scala} scala> model_date.toDF.show +--------+--------------------+ | date| model| +--------+--------------------+ |20160610|[aa.bb.spark.types.PersonWrapper@8542...| |20160610|[aa.bb.spark.types.PersonWrapper@8831......| ... ... +--------+--------------------+ {code} I have created the table with some specific properties so I can say how is defined the table and how to parse from PersonType UDT to table schema: {code:sql} create table model_orc (`model` struct<personWrapper:struct<id:int,name:string>>) PARTITIONED BY (`date` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES ('path'='hdfs:///user/raulsaez/model_orc') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs:///user/raulsaez/model_orc' TBLPROPERTIES('spark.sql.sources.schema.numParts'='1','spark.sql.sources.schema.part.0'='{ \"type\":\"struct\",\"fields\":[{ \"name\":\"personWrapper\",\"type\":{ \"type\":\"udt\",\"class\":\"aa.bb.spark.types.PersonType\",\"pyClass\":null,\"sqlType\":{ \"type\":\"struct\",\"fields\":[{ \"name\":\"id\",\"type\": \"integer\",\"nullable\":true,\"metadata\":{} } ,{ \"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{} }] } },\"nullable\":true,\"metadata\":{} }] }') {code} Now we insert data into table: {code:scala} scala> hiveContext.sql("insert into model_orc partition(date=20160610) select model,date from dfJune") org.apache.spark.sql.AnalysisException: cannot resolve 'cast(model as struct<person:struct<personWrapper:struct<id:int,name:string>>>)' due to data type mismatch: cannot cast StructType(StructField(personWrapper,,true)),true) to StructType(StructField(person,StructType(StructField(id,IntegerType,true),StructField(name,StringType,true),),true) {code} I have the same issue with both Parquet and ORC. And if I persist the DataFrame as a table with ORC: {code:scala} model_date.toDF.write.format("orc").partitionBy("date").saveAsTable("model_orc_asTable") {code} Or even if I persist it as a ORC file: {code:scala} scala> model_date.toDF.write.mode(SaveMode.Append).format("orc").partitionBy("date").save("model_orc") {code} I get the ClassCastException: {code:scala} Caused by: java.lang.ClassCastException: aa.bb.spark.types.PersonType cannot be cast to org.apache.spark.sql.types.StructType at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:557) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:590) at org.apache.spark.sql.hive.HiveInspectors$$anonfun$wrap$1.apply(HiveInspectors.scala:589) at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135) at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:589) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:568) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrap(OrcRelation.scala:66) at org.apache.spark.sql.hive.orc.OrcOutputWriter.wrapOrcStruct(OrcRelation.scala:128) at org.apache.spark.sql.hive.orc.OrcOutputWriter.writeInternal(OrcRelation.scala:139) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:358) ... 8 more {code} If I persist the DataFrame as a table with Parquet: {code:scala} scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").saveAsTable("model_parquet_asTable") 16/10/27 09:39:24 WARN HiveContext$$anon$2: Persisting partitioned data source relation `model_parquet_asTable` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input path(s): hdfs://dev-nameservice/apps/hive/warehouse/model_parquet_astable ... ... ... scala> hiveContext.sql("select * from model_parquet_asTable where date=20160610").show +--------------------+--------+ | model| date| +--------------------+--------+ |[aa.bb.spark.types.PersonWrapper@8542...|20160610| |[aa.bb.spark.types.PersonWrapper@8831...|20160610| |[aa.bb.spark.types.PersonWrapper@3661...|20160610| ... ... ... +--------------------+--------+ only showing top 20 rows {code} >From Hive I can see the table though it looks empty (I know that Spark told me >it was stored in a Spark SQL specific format): {code:scala} 0: jdbc:hive2://imp1tvhdpedg1.corp.du.ae:1000> select * from model_parquet_astable; +----------------------------+--+ | model_parquet_astable.col | +----------------------------+--+ +----------------------------+--+ No rows selected (0.057 seconds) {code} And if I persist the DataFrame as a Parquet file it is working fine: {code:scala} scala> model_date.toDF.write.mode(SaveMode.Append).format("parquet").partitionBy("date").save("model_parquet_asTable") ... ... $ hdfs dfs -ls -R drwx------ - raulsaez raulsaez 0 2016-10-27 09:27 model_parquet_asTable/date=20160610 -rw------- 3 raulsaez raulsaez 36725 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00000-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 34638 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00001-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 38752 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00002-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 27702 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00003-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 41743 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00004-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 35128 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00005-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 40996 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00006-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet -rw------- 3 raulsaez raulsaez 29046 2016-10-27 09:27 model_parquet_asTable/date=20160610/part-r-00007-e660a144-53c3-4c74-92c9-bb2323cc247f.gz.parquet {code} It would be very interesting to have a way for persisting a DataFrame with UDT types as a table with partitions that is also functional in Hive. > DataFrame. saveAsTable creates RDD partitions but not Hive partitions > --------------------------------------------------------------------- > > Key: SPARK-14927 > URL: https://issues.apache.org/jira/browse/SPARK-14927 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.5.2, 1.6.1 > Environment: Mac OS X 10.11.4 local > Reporter: Sasha Ovsankin > > This is a followup to > http://stackoverflow.com/questions/31341498/save-spark-dataframe-as-dynamic-partitioned-table-in-hive > . I tried to use suggestions in the answers but couldn't make it to work in > Spark 1.6.1 > I am trying to create partitions programmatically from `DataFrame. Here is > the relevant code (adapted from a Spark test): > hc.setConf("hive.metastore.warehouse.dir", "tmp/tests") > // hc.setConf("hive.exec.dynamic.partition", "true") > // hc.setConf("hive.exec.dynamic.partition.mode", "nonstrict") > hc.sql("create database if not exists tmp") > hc.sql("drop table if exists tmp.partitiontest1") > Seq(2012 -> "a").toDF("year", "val") > .write > .partitionBy("year") > .mode(SaveMode.Append) > .saveAsTable("tmp.partitiontest1") > hc.sql("show partitions tmp.partitiontest1").show > Full file is here: > https://gist.github.com/SashaOv/7c65f03a51c7e8f9c9e018cd42aa4c4a > I get the error that the table is not partitioned: > ====================== > HIVE FAILURE OUTPUT > ====================== > SET hive.support.sql11.reserved.keywords=false > SET hive.metastore.warehouse.dir=tmp/tests > OK > OK > FAILED: Execution Error, return code 1 from > org.apache.hadoop.hive.ql.exec.DDLTask. Table tmp.partitiontest1 is not a > partitioned table > ====================== > It looks like the root cause is that > `org.apache.spark.sql.hive.HiveMetastoreCatalog.newSparkSQLSpecificMetastoreTable` > always creates table with empty partitions. > Any help to move this forward is appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org