[ 
https://issues.apache.org/jira/browse/SPARK-22137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16185548#comment-16185548
 ] 

Jia-Xuan Liu commented on SPARK-22137:
--------------------------------------

umm... It's still fail.

{code:java}
scala> val tdf = spark.table("test")
tdf: org.apache.spark.sql.DataFrame = [id: bigint, features: vector]

scala> tdf.write.insertInto("table_udt")
org.apache.spark.sql.AnalysisException: cannot resolve 'test.`features`' due to 
data type mismatch: cannot cast org.apache.spark.ml.linalg.
VectorUDT@3bfc3ba7 to StructType(StructField(type,ByteType,true), 
StructField(size,IntegerType,true), StructField(indices,ArrayType(Integer
Type,true),true), StructField(values,ArrayType(DoubleType,true),true));;
'InsertIntoHadoopFsRelationCommand 
file:/home/jax/git/spark/spark-warehouse/table_udt, false, Parquet, 
Map(mergeschema -> false), Append, C
atalogTable(
Database: default
Table: table_udt
Owner: jax
Created Time: Fri Sep 29 17:07:52 CST 2017
Last Access: Thu Jan 01 08:00:00 CST 1970
Created By: Spark 2.3.0-SNAPSHOT
Type: MANAGED
Provider: hive
Table Properties: [transient_lastDdlTime=1506676072]
Location: file:/home/jax/git/spark/spark-warehouse/table_udt
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Storage Properties: [serialization.format=1]
Partition Provider: Catalog
Schema: root
 |-- id: integer (nullable = true)
 |-- features: struct (nullable = true)
 |    |-- type: byte (nullable = true)
 |    |-- size: integer (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
), org.apache.spark.sql.execution.datasources.InMemoryFileIndex@c1940c95
+- 'Project [cast(id#37L as int) AS id#66, cast(features#38 as 
struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) AS fe
atures#67]
   +- SubqueryAlias test
      +- Relation[id#37L,features#38] parquet

  at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:95)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:87)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:89)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:89)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:100)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:110)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(Que
ryPlan.scala:114)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:114)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:119)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:119)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:89)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:87)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:89)
  at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:53)
  at 
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at 
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:73)
  at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:79)
  at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
  at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85)
  at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81)
  at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
  at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:611)
  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:283)
  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:269)
  ... 48 elided

{code}


> Failed to insert VectorUDT to hive table with 
> DataFrameWriter.insertInto(tableName: String)
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22137
>                 URL: https://issues.apache.org/jira/browse/SPARK-22137
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: yzheng616
>
> Failed to insert VectorUDT to hive table with 
> DataFrameWriter.insertInto(tableName: String). The issue seems similar with 
> SPARK-17765 which have been resolved in 2.1.0. 
> Error message: 
> {color:red}Exception in thread "main" org.apache.spark.sql.AnalysisException: 
> cannot resolve '`features`' due to data type mismatch: cannot cast 
> org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 to 
> StructType(StructField(type,ByteType,true), 
> StructField(size,IntegerType,true), 
> StructField(indices,ArrayType(IntegerType,true),true), 
> StructField(values,ArrayType(DoubleType,true),true));;
> 'InsertIntoTable Relation[id#21,features#22] parquet, 
> OverwriteOptions(false,Map()), false
> +- 'Project [cast(id#13L as int) AS id#27, cast(features#14 as 
> struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) AS 
> features#28]
>    +- LogicalRDD [id#13L, features#14]{color}
> Reproduce code:
> {code:java}
> import scala.annotation.varargs
> import org.apache.spark.ml.linalg.SQLDataTypes
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.types.LongType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> case class UDT(`id`: Long, `features`: org.apache.spark.ml.linalg.Vector)
> object UDTTest {
>   def main(args: Array[String]): Unit = {
>     val tb = "table_udt"
>     val spark = 
> SparkSession.builder().master("local[4]").appName("UDTInsertInto").enableHiveSupport().getOrCreate()
>     spark.sql("drop table if exists " + tb)
>     
>     /*
>      * VectorUDT sql type definition:
>      * 
>      *   override def sqlType: StructType = {
>      *   StructType(Seq(
>      *        StructField("type", ByteType, nullable = false),
>      *        StructField("size", IntegerType, nullable = true),
>      *        StructField("indices", ArrayType(IntegerType, containsNull = 
> false), nullable = true),
>      *        StructField("values", ArrayType(DoubleType, containsNull = 
> false), nullable = true)))
>      *   }
>     */
>     
>     //Create Hive table base on VectorUDT sql type
>     spark.sql("create table if not exists "+tb+"(id int, features 
> struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)" +
>       " row format serde 
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>       " stored as"+
>         " inputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
>         " outputformat 
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
>     var seq = new scala.collection.mutable.ArrayBuffer[UDT]()
>     for (x <- 1 to 2) {
>       seq += (new UDT(x, org.apache.spark.ml.linalg.Vectors.dense(0.2, 0.21, 
> 0.44)))
>     }
>     val rowRDD = (spark.sparkContext.makeRDD[UDT](seq)).map { x => 
> Row.fromSeq(Seq(x.id,x.features)) }
>     val schema = StructType(Array(StructField("id", 
> LongType,false),StructField("features", SQLDataTypes.VectorType,false)))
>     val df = spark.createDataFrame(rowRDD, schema)
>      
>     //insert into hive table
>     df.write.insertInto(tb)
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to