Repository: spark
Updated Branches:
  refs/heads/master 221d03acc -> 893ea224c


[SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFileFormat

## What changes were proposed in this pull request?
This pr added code to verify a schema in Json/Orc/ParquetFileFormat along with 
CSVFileFormat.

## How was this patch tested?
Added verification tests in `FileBasedDataSourceSuite` and  
`HiveOrcSourceSuite`.

Author: Takeshi Yamamuro <yamam...@apache.org>

Closes #21389 from maropu/SPARK-24204.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/893ea224
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/893ea224
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/893ea224

Branch: refs/heads/master
Commit: 893ea224cc738766be207c87f4b913fe8fea4c94
Parents: 221d03a
Author: Takeshi Yamamuro <yamam...@apache.org>
Authored: Wed Jun 27 15:25:51 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Wed Jun 27 15:25:51 2018 -0700

----------------------------------------------------------------------
 .../execution/datasources/DataSourceUtils.scala | 106 +++++++++
 .../datasources/csv/CSVFileFormat.scala         |   4 +-
 .../execution/datasources/csv/CSVUtils.scala    |  19 --
 .../datasources/json/JsonFileFormat.scala       |   4 +
 .../datasources/orc/OrcFileFormat.scala         |   4 +
 .../datasources/parquet/ParquetFileFormat.scala |   3 +
 .../spark/sql/FileBasedDataSourceSuite.scala    | 213 ++++++++++++++++++-
 .../execution/datasources/csv/CSVSuite.scala    |  33 ---
 .../spark/sql/hive/orc/OrcFileFormat.scala      |   4 +
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala |  49 ++++-
 10 files changed, 383 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
new file mode 100644
index 0000000..c534721
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.types._
+
+
+object DataSourceUtils {
+
+  /**
+   * Verify if the schema is supported in datasource in write path.
+   */
+  def verifyWriteSchema(format: FileFormat, schema: StructType): Unit = {
+    verifySchema(format, schema, isReadPath = false)
+  }
+
+  /**
+   * Verify if the schema is supported in datasource in read path.
+   */
+  def verifyReadSchema(format: FileFormat, schema: StructType): Unit = {
+    verifySchema(format, schema, isReadPath = true)
+  }
+
+  /**
+   * Verify if the schema is supported in datasource. This verification should 
be done
+   * in a driver side, e.g., `prepareWrite`, `buildReader`, and 
`buildReaderWithPartitionValues`
+   * in `FileFormat`.
+   *
+   * Unsupported data types of csv, json, orc, and parquet are as follows;
+   *  csv -> R/W: Interval, Null, Array, Map, Struct
+   *  json -> W: Interval
+   *  orc -> W: Interval, Null
+   *  parquet -> R/W: Interval, Null
+   */
+  private def verifySchema(format: FileFormat, schema: StructType, isReadPath: 
Boolean): Unit = {
+    def throwUnsupportedException(dataType: DataType): Unit = {
+      throw new UnsupportedOperationException(
+        s"$format data source does not support ${dataType.simpleString} data 
type.")
+    }
+
+    def verifyType(dataType: DataType): Unit = dataType match {
+      case BooleanType | ByteType | ShortType | IntegerType | LongType | 
FloatType | DoubleType |
+           StringType | BinaryType | DateType | TimestampType | _: DecimalType 
=>
+
+      // All the unsupported types for CSV
+      case _: NullType | _: CalendarIntervalType | _: StructType | _: 
ArrayType | _: MapType
+          if format.isInstanceOf[CSVFileFormat] =>
+        throwUnsupportedException(dataType)
+
+      case st: StructType => st.foreach { f => verifyType(f.dataType) }
+
+      case ArrayType(elementType, _) => verifyType(elementType)
+
+      case MapType(keyType, valueType, _) =>
+        verifyType(keyType)
+        verifyType(valueType)
+
+      case udt: UserDefinedType[_] => verifyType(udt.sqlType)
+
+      // Interval type not supported in all the write path
+      case _: CalendarIntervalType if !isReadPath =>
+        throwUnsupportedException(dataType)
+
+      // JSON and ORC don't support an Interval type, but we pass it in read 
pass
+      // for back-compatibility.
+      case _: CalendarIntervalType if format.isInstanceOf[JsonFileFormat] ||
+        format.isInstanceOf[OrcFileFormat] =>
+
+      // Interval type not supported in the other read path
+      case _: CalendarIntervalType =>
+        throwUnsupportedException(dataType)
+
+      // For JSON & ORC backward-compatibility
+      case _: NullType if format.isInstanceOf[JsonFileFormat] ||
+        (isReadPath && format.isInstanceOf[OrcFileFormat]) =>
+
+      // Null type not supported in the other path
+      case _: NullType =>
+        throwUnsupportedException(dataType)
+
+      // We keep this default case for safeguards
+      case _ => throwUnsupportedException(dataType)
+    }
+
+    schema.foreach(field => verifyType(field.dataType))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index b90275d..fa366cc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -66,7 +66,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    CSVUtils.verifySchema(dataSchema)
+    DataSourceUtils.verifyWriteSchema(this, dataSchema)
     val conf = job.getConfiguration
     val csvOptions = new CSVOptions(
       options,
@@ -98,7 +98,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
-    CSVUtils.verifySchema(dataSchema)
+    DataSourceUtils.verifyReadSchema(this, dataSchema)
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index 1012e77..7ce65fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -118,25 +118,6 @@ object CSVUtils {
   }
 
   /**
-   * Verify if the schema is supported in CSV datasource.
-   */
-  def verifySchema(schema: StructType): Unit = {
-    def verifyType(dataType: DataType): Unit = dataType match {
-      case ByteType | ShortType | IntegerType | LongType | FloatType |
-           DoubleType | BooleanType | _: DecimalType | TimestampType |
-           DateType | StringType =>
-
-      case udt: UserDefinedType[_] => verifyType(udt.sqlType)
-
-      case _ =>
-        throw new UnsupportedOperationException(
-          s"CSV data source does not support ${dataType.simpleString} data 
type.")
-    }
-
-    schema.foreach(field => verifyType(field.dataType))
-  }
-
-  /**
    * Sample CSV dataset as configured by `samplingRatio`.
    */
   def sample(csv: Dataset[String], options: CSVOptions): Dataset[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index e9a0b38..383bff1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -65,6 +65,8 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    DataSourceUtils.verifyWriteSchema(this, dataSchema)
+
     val conf = job.getConfiguration
     val parsedOptions = new JSONOptions(
       options,
@@ -96,6 +98,8 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    DataSourceUtils.verifyReadSchema(this, dataSchema)
+
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 1de2ca2..df488a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -89,6 +89,8 @@ class OrcFileFormat
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    DataSourceUtils.verifyWriteSchema(this, dataSchema)
+
     val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
 
     val conf = job.getConfiguration
@@ -141,6 +143,8 @@ class OrcFileFormat
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
+    DataSourceUtils.verifyReadSchema(this, dataSchema)
+
     if (sparkSession.sessionState.conf.orcFilterPushDown) {
       OrcFilters.createFilter(dataSchema, filters).foreach { f =>
         OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 60fc9ec..9602a08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -78,6 +78,7 @@ class ParquetFileFormat
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    DataSourceUtils.verifyWriteSchema(this, dataSchema)
 
     val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
 
@@ -302,6 +303,8 @@ class ParquetFileFormat
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
+    DataSourceUtils.verifyReadSchema(this, dataSchema)
+
     hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
     hadoopConf.set(
       ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 0630309..86f9647 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql
 
-import java.io.FileNotFoundException
+import java.io.{File, FileNotFoundException}
+import java.util.Locale
 
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, 
NullUDT}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -202,4 +204,213 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
       }
     }
   }
+
+  // Unsupported data types of csv, json, orc, and parquet are as follows;
+  //  csv -> R/W: Interval, Null, Array, Map, Struct
+  //  json -> W: Interval
+  //  orc -> W: Interval, Null
+  //  parquet -> R/W: Interval, Null
+  test("SPARK-24204 error handling for unsupported Array/Map/Struct types - 
csv") {
+    withTempDir { dir =>
+      val csvDir = new File(dir, "csv").getCanonicalPath
+      var msg = intercept[UnsupportedOperationException] {
+        Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, 
b)").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support 
struct<a:int,b:string> data type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        val schema = StructType.fromDDL("a struct<b: Int>")
+        spark.range(1).write.mode("overwrite").csv(csvDir)
+        spark.read.schema(schema).csv(csvDir).collect()
+      }.getMessage
+      assert(msg.contains("CSV data source does not support struct<b:int> data 
type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, Map("Tesla" -> 3))).toDF("id", 
"cars").write.mode("overwrite").csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support map<string,int> 
data type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        val schema = StructType.fromDDL("a map<int, int>")
+        spark.range(1).write.mode("overwrite").csv(csvDir)
+        spark.read.schema(schema).csv(csvDir).collect()
+      }.getMessage
+      assert(msg.contains("CSV data source does not support map<int,int> data 
type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands")
+          .write.mode("overwrite").csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support array<string> data 
type"))
+
+      msg = intercept[UnsupportedOperationException] {
+         val schema = StructType.fromDDL("a array<int>")
+         spark.range(1).write.mode("overwrite").csv(csvDir)
+         spark.read.schema(schema).csv(csvDir).collect()
+       }.getMessage
+      assert(msg.contains("CSV data source does not support array<int> data 
type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", 
"vectors")
+          .write.mode("overwrite").csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support array<double> data 
type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), 
true) :: Nil)
+        spark.range(1).write.mode("overwrite").csv(csvDir)
+        spark.read.schema(schema).csv(csvDir).collect()
+      }.getMessage
+      assert(msg.contains("CSV data source does not support array<double> data 
type."))
+    }
+  }
+
+  test("SPARK-24204 error handling for unsupported Interval data types - csv, 
json, parquet, orc") {
+    withTempDir { dir =>
+      val tempDir = new File(dir, "files").getCanonicalPath
+
+      // write path
+      Seq("csv", "json", "parquet", "orc").foreach { format =>
+        var msg = intercept[AnalysisException] {
+          sql("select interval 1 
days").write.format(format).mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.contains("Cannot save interval data type into external 
storage."))
+
+        msg = intercept[UnsupportedOperationException] {
+          spark.udf.register("testType", () => new IntervalData())
+          sql("select 
testType()").write.format(format).mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support calendarinterval 
data type."))
+      }
+
+      // read path
+      Seq("parquet", "csv").foreach { format =>
+        var msg = intercept[UnsupportedOperationException] {
+          val schema = StructType(StructField("a", CalendarIntervalType, true) 
:: Nil)
+          spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+          spark.read.schema(schema).format(format).load(tempDir).collect()
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support calendarinterval 
data type."))
+
+        msg = intercept[UnsupportedOperationException] {
+          val schema = StructType(StructField("a", new IntervalUDT(), true) :: 
Nil)
+          spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+          spark.read.schema(schema).format(format).load(tempDir).collect()
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support calendarinterval 
data type."))
+      }
+
+      // We expect the types below should be passed for backward-compatibility
+      Seq("orc", "json").foreach { format =>
+        // Interval type
+        var schema = StructType(StructField("a", CalendarIntervalType, true) 
:: Nil)
+        spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+        spark.read.schema(schema).format(format).load(tempDir).collect()
+
+        // UDT having interval data
+        schema = StructType(StructField("a", new IntervalUDT(), true) :: Nil)
+        spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+        spark.read.schema(schema).format(format).load(tempDir).collect()
+      }
+    }
+  }
+
+  test("SPARK-24204 error handling for unsupported Null data types - csv, 
parquet, orc") {
+    withTempDir { dir =>
+      val tempDir = new File(dir, "files").getCanonicalPath
+
+      Seq("orc").foreach { format =>
+        // write path
+        var msg = intercept[UnsupportedOperationException] {
+          sql("select 
null").write.format(format).mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support null data type."))
+
+        msg = intercept[UnsupportedOperationException] {
+          spark.udf.register("testType", () => new NullData())
+          sql("select 
testType()").write.format(format).mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support null data type."))
+
+        // read path
+        // We expect the types below should be passed for 
backward-compatibility
+
+        // Null type
+        var schema = StructType(StructField("a", NullType, true) :: Nil)
+        spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+        spark.read.schema(schema).format(format).load(tempDir).collect()
+
+        // UDT having null data
+        schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
+        spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+        spark.read.schema(schema).format(format).load(tempDir).collect()
+      }
+
+      Seq("parquet", "csv").foreach { format =>
+        // write path
+        var msg = intercept[UnsupportedOperationException] {
+          sql("select 
null").write.format(format).mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support null data type."))
+
+        msg = intercept[UnsupportedOperationException] {
+          spark.udf.register("testType", () => new NullData())
+          sql("select 
testType()").write.format(format).mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support null data type."))
+
+        // read path
+        msg = intercept[UnsupportedOperationException] {
+          val schema = StructType(StructField("a", NullType, true) :: Nil)
+          spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+          spark.read.schema(schema).format(format).load(tempDir).collect()
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support null data type."))
+
+        msg = intercept[UnsupportedOperationException] {
+          val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
+          spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+          spark.read.schema(schema).format(format).load(tempDir).collect()
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"$format data source does not support null data type."))
+      }
+    }
+  }
+}
+
+object TestingUDT {
+
+  @SQLUserDefinedType(udt = classOf[IntervalUDT])
+  class IntervalData extends Serializable
+
+  class IntervalUDT extends UserDefinedType[IntervalData] {
+
+    override def sqlType: DataType = CalendarIntervalType
+    override def serialize(obj: IntervalData): Any =
+      throw new NotImplementedError("Not implemented")
+    override def deserialize(datum: Any): IntervalData =
+      throw new NotImplementedError("Not implemented")
+    override def userClass: Class[IntervalData] = classOf[IntervalData]
+  }
+
+  @SQLUserDefinedType(udt = classOf[NullUDT])
+  private[sql] class NullData extends Serializable
+
+  private[sql] class NullUDT extends UserDefinedType[NullData] {
+
+    override def sqlType: DataType = NullType
+    override def serialize(obj: NullData): Any = throw new 
NotImplementedError("Not implemented")
+    override def deserialize(datum: Any): NullData =
+      throw new NotImplementedError("Not implemented")
+    override def userClass: Class[NullData] = classOf[NullData]
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index d2f166c..365239d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -740,39 +740,6 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     assert(numbers.count() == 8)
   }
 
-  test("error handling for unsupported data types.") {
-    withTempDir { dir =>
-      val csvDir = new File(dir, "csv").getCanonicalPath
-      var msg = intercept[UnsupportedOperationException] {
-        Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, 
b)").write.csv(csvDir)
-      }.getMessage
-      assert(msg.contains("CSV data source does not support 
struct<a:int,b:string> data type"))
-
-      msg = intercept[UnsupportedOperationException] {
-        Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
-      }.getMessage
-      assert(msg.contains("CSV data source does not support map<string,int> 
data type"))
-
-      msg = intercept[UnsupportedOperationException] {
-        Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", 
"brands").write.csv(csvDir)
-      }.getMessage
-      assert(msg.contains("CSV data source does not support array<string> data 
type"))
-
-      msg = intercept[UnsupportedOperationException] {
-        Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", 
"vectors")
-          .write.csv(csvDir)
-      }.getMessage
-      assert(msg.contains("CSV data source does not support array<double> data 
type"))
-
-      msg = intercept[UnsupportedOperationException] {
-        val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), 
true) :: Nil)
-        spark.range(1).write.csv(csvDir)
-        spark.read.schema(schema).csv(csvDir).collect()
-      }.getMessage
-      assert(msg.contains("CSV data source does not support array<double> data 
type."))
-    }
-  }
-
   test("SPARK-15585 turn off quotations") {
     val cars = spark.read
       .format("csv")

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 237ed9b..dd2144c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -72,6 +72,8 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    DataSourceUtils.verifyWriteSchema(this, dataSchema)
+
     val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
 
     val configuration = job.getConfiguration
@@ -121,6 +123,8 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
+    DataSourceUtils.verifyReadSchema(this, dataSchema)
+
     if (sparkSession.sessionState.conf.orcFilterPushDown) {
       // Sets pushed predicates
       OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f =>

http://git-wip-us.apache.org/repos/asf/spark/blob/893ea224/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index d556a03..69009e1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.orc
 
 import java.io.File
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.orc.OrcSuite
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
@@ -133,4 +135,49 @@ class HiveOrcSourceSuite extends OrcSuite with 
TestHiveSingleton {
       Utils.deleteRecursively(location)
     }
   }
+
+  test("SPARK-24204 error handling for unsupported data types") {
+    withTempDir { dir =>
+      val orcDir = new File(dir, "orc").getCanonicalPath
+
+      // write path
+      var msg = intercept[AnalysisException] {
+        sql("select interval 1 days").write.mode("overwrite").orc(orcDir)
+      }.getMessage
+      assert(msg.contains("Cannot save interval data type into external 
storage."))
+
+      msg = intercept[UnsupportedOperationException] {
+        sql("select null").write.mode("overwrite").orc(orcDir)
+      }.getMessage
+      assert(msg.contains("ORC data source does not support null data type."))
+
+      msg = intercept[UnsupportedOperationException] {
+        spark.udf.register("testType", () => new IntervalData())
+        sql("select testType()").write.mode("overwrite").orc(orcDir)
+      }.getMessage
+      assert(msg.contains("ORC data source does not support calendarinterval 
data type."))
+
+      // read path
+      msg = intercept[UnsupportedOperationException] {
+        val schema = StructType(StructField("a", CalendarIntervalType, true) 
:: Nil)
+        spark.range(1).write.mode("overwrite").orc(orcDir)
+        spark.read.schema(schema).orc(orcDir).collect()
+      }.getMessage
+      assert(msg.contains("ORC data source does not support calendarinterval 
data type."))
+
+      msg = intercept[UnsupportedOperationException] {
+        val schema = StructType(StructField("a", NullType, true) :: Nil)
+        spark.range(1).write.mode("overwrite").orc(orcDir)
+        spark.read.schema(schema).orc(orcDir).collect()
+      }.getMessage
+      assert(msg.contains("ORC data source does not support null data type."))
+
+      msg = intercept[UnsupportedOperationException] {
+        val schema = StructType(StructField("a", new IntervalUDT(), true) :: 
Nil)
+        spark.range(1).write.mode("overwrite").orc(orcDir)
+        spark.read.schema(schema).orc(orcDir).collect()
+      }.getMessage
+      assert(msg.contains("ORC data source does not support calendarinterval 
data type."))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to