This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1beed0d  [SPARK-26765][SQL] Avro: Validate input and output schema
1beed0d is described below

commit 1beed0d7c253da4fde12bfeea96cc0fbcc1aae25
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Wed Jan 30 00:17:33 2019 +0800

    [SPARK-26765][SQL] Avro: Validate input and output schema
    
    ## What changes were proposed in this pull request?
    
    The API `supportDataType` in `FileFormat` helps to validate the 
output/input schema before exection starts. So that we can avoid some invalid 
data source IO, and users can see clean error messages.
    
    This PR is to override the validation API in Avro data source.
    Also, as per the spec of 
Avro(https://avro.apache.org/docs/1.8.2/spec.html), `NullType` is supported. 
This PR fixes the handling of `NullType`.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #23684 from gengliangwang/avroSupportDataType.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/avro/AvroFileFormat.scala | 19 +++++++-
 .../apache/spark/sql/avro/SchemaConverters.scala   |  5 ++-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 50 ++++++++++++----------
 3 files changed, 50 insertions(+), 24 deletions(-)

diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index e60fa88..7391665 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 private[avro] class AvroFileFormat extends FileFormat
@@ -243,6 +243,23 @@ private[avro] class AvroFileFormat extends FileFormat
       }
     }
   }
+
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
+    case _: AtomicType => true
+
+    case st: StructType => st.forall { f => supportDataType(f.dataType) }
+
+    case ArrayType(elementType, _) => supportDataType(elementType)
+
+    case MapType(keyType, valueType, _) =>
+      supportDataType(keyType) && supportDataType(valueType)
+
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
+
+    case _: NullType => true
+
+    case _ => false
+  }
 }
 
 private[avro] object AvroFileFormat {
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 64127af..3947d32 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -70,6 +70,8 @@ object SchemaConverters {
 
       case ENUM => SchemaType(StringType, nullable = false)
 
+      case NULL => SchemaType(NullType, nullable = true)
+
       case RECORD =>
         if (existingRecordNames.contains(avroSchema.getFullName)) {
           throw new IncompatibleSchemaException(s"""
@@ -151,6 +153,7 @@ object SchemaConverters {
       case FloatType => builder.floatType()
       case DoubleType => builder.doubleType()
       case StringType => builder.stringType()
+      case NullType => builder.nullType()
       case d: DecimalType =>
         val avroType = LogicalTypes.decimal(d.precision, d.scale)
         val fixedSize = minBytesForPrecision(d.precision)
@@ -181,7 +184,7 @@ object SchemaConverters {
       // This should never happen.
       case other => throw new IncompatibleSchemaException(s"Unexpected type 
$other.")
     }
-    if (nullable) {
+    if (nullable && catalystType != NullType) {
       Schema.createUnion(schema, nullSchema)
     } else {
       schema
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 207c54c..d803537 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net.URL
 import java.nio.file.{Files, Paths}
 import java.sql.{Date, Timestamp}
-import java.util.{TimeZone, UUID}
+import java.util.{Locale, TimeZone, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -35,6 +35,7 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
+import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -135,27 +136,6 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     }
   }
 
-  test("test NULL avro type") {
-    withTempPath { dir =>
-      val fields =
-        Seq(new Field("null", Schema.create(Type.NULL), "doc", 
null.asInstanceOf[AnyVal])).asJava
-      val schema = Schema.createRecord("name", "docs", "namespace", false)
-      schema.setFields(fields)
-      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
-      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
-      dataFileWriter.create(schema, new File(s"$dir.avro"))
-      val avroRec = new GenericData.Record(schema)
-      avroRec.put("null", null)
-      dataFileWriter.append(avroRec)
-      dataFileWriter.flush()
-      dataFileWriter.close()
-
-      intercept[IncompatibleSchemaException] {
-        spark.read.format("avro").load(s"$dir.avro")
-      }
-    }
-  }
-
   test("union(int, long) is read as long") {
     withTempPath { dir =>
       val avroSchema: Schema = {
@@ -903,6 +883,32 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     }
   }
 
+  test("error handling for unsupported Interval data types") {
+    withTempDir { dir =>
+      val tempDir = new File(dir, "files").getCanonicalPath
+      var msg = intercept[AnalysisException] {
+        sql("select interval 1 
days").write.format("avro").mode("overwrite").save(tempDir)
+      }.getMessage
+      assert(msg.contains("Cannot save interval data type into external 
storage."))
+
+      msg = intercept[AnalysisException] {
+        spark.udf.register("testType", () => new IntervalData())
+        sql("select 
testType()").write.format("avro").mode("overwrite").save(tempDir)
+      }.getMessage
+      assert(msg.toLowerCase(Locale.ROOT)
+        .contains(s"data source does not support calendarinterval data type."))
+    }
+  }
+
+  test("support Null data types") {
+    withTempDir { dir =>
+      val tempDir = new File(dir, "files").getCanonicalPath
+      val df = sql("select null")
+      df.write.format("avro").mode("overwrite").save(tempDir)
+      checkAnswer(spark.read.format("avro").load(tempDir), df)
+    }
+  }
+
   test("throw exception if unable to write with user provided Avro schema") {
     val input: Seq[(DataType, Schema.Type)] = Seq(
       (NullType, NULL),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to