Repository: spark
Updated Branches:
  refs/heads/master ddd02f50b -> a2d464770


[SPARK-17765][SQL] Support for writing out user-defined type in ORC datasource

## What changes were proposed in this pull request?

This PR adds the support for `UserDefinedType` when writing out instead of 
throwing `ClassCastException` in ORC data source.

In more details, `OrcStruct` is being created based on string 
from`DataType.catalogString`. For user-defined type, it seems it returns 
`sqlType.simpleString` for `catalogString` by default[1]. However, during 
type-dispatching to match the output with the schema, it tries to cast to, for 
example, `StructType`[2].

So, running the codes below (`MyDenseVector` was borrowed[3]) :

``` scala
val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
val udtDF = data.toDF("id", "vectors")
udtDF.write.orc("/tmp/test.orc")
```

ends up throwing an exception as below:

```
java.lang.ClassCastException: org.apache.spark.sql.UDT$MyDenseVectorUDT cannot 
be cast to org.apache.spark.sql.types.ArrayType
    at 
org.apache.spark.sql.hive.HiveInspectors$class.wrapperFor(HiveInspectors.scala:381)
    at 
org.apache.spark.sql.hive.orc.OrcSerializer.wrapperFor(OrcFileFormat.scala:164)
...
```

So, this PR uses `UserDefinedType.sqlType` during finding the correct converter 
when writing out in ORC data source.

[1]https://github.com/apache/spark/blob/dfdcab00c7b6200c22883baa3ebc5818be09556f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L95
[2]https://github.com/apache/spark/blob/d2dc8c4a162834818190ffd82894522c524ca3e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L326
[3]https://github.com/apache/spark/blob/2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala#L38-L70
## How was this patch tested?

Unit tests in `OrcQuerySuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #15361 from HyukjinKwon/SPARK-17765.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d46477
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d46477
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d46477

Branch: refs/heads/master
Commit: a2d464770cd183daa7d727bf377bde9c21e29e6a
Parents: ddd02f5
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Mon Nov 21 13:23:32 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Nov 21 13:23:32 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/hive/HiveInspectors.scala  |  3 +++
 .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala     | 10 ++++++++++
 2 files changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a2d46477/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index e303065..52aa108 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -246,6 +246,9 @@ private[hive] trait HiveInspectors {
    * Wraps with Hive types based on object inspector.
    */
   protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => 
Any = oi match {
+    case _ if dataType.isInstanceOf[UserDefinedType[_]] =>
+      val sqlType = dataType.asInstanceOf[UserDefinedType[_]].sqlType
+      wrapperFor(oi, sqlType)
     case x: ConstantObjectInspector =>
       (o: Any) =>
         x.getWritableConstantValue

http://git-wip-us.apache.org/repos/asf/spark/blob/a2d46477/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index a628977..b8761e9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -93,6 +93,16 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll 
with OrcTest {
     }
   }
 
+  test("Read/write UserDefinedType") {
+    withTempPath { path =>
+      val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
+      val udtDF = data.toDF("id", "vectors")
+      udtDF.write.orc(path.getAbsolutePath)
+      val readBack = spark.read.schema(udtDF.schema).orc(path.getAbsolutePath)
+      checkAnswer(udtDF, readBack)
+    }
+  }
+
   test("Creating case class RDD table") {
     val data = (1 to 100).map(i => (i, s"val_$i"))
     sparkContext.parallelize(data).toDF().createOrReplaceTempView("t")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to