This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1beed0d [SPARK-26765][SQL] Avro: Validate input and output schema 1beed0d is described below commit 1beed0d7c253da4fde12bfeea96cc0fbcc1aae25 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Wed Jan 30 00:17:33 2019 +0800 [SPARK-26765][SQL] Avro: Validate input and output schema ## What changes were proposed in this pull request? The API `supportDataType` in `FileFormat` helps to validate the output/input schema before exection starts. So that we can avoid some invalid data source IO, and users can see clean error messages. This PR is to override the validation API in Avro data source. Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), `NullType` is supported. This PR fixes the handling of `NullType`. ## How was this patch tested? Unit test Closes #23684 from gengliangwang/avroSupportDataType. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 19 +++++++- .../apache/spark/sql/avro/SchemaConverters.scala | 5 ++- .../org/apache/spark/sql/avro/AvroSuite.scala | 50 ++++++++++++---------- 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index e60fa88..7391665 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.sources.{DataSourceRegister, Filter} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} private[avro] class AvroFileFormat extends FileFormat @@ -243,6 +243,23 @@ private[avro] class AvroFileFormat extends FileFormat } } } + + override def supportDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case st: StructType => st.forall { f => supportDataType(f.dataType) } + + case ArrayType(elementType, _) => supportDataType(elementType) + + case MapType(keyType, valueType, _) => + supportDataType(keyType) && supportDataType(valueType) + + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) + + case _: NullType => true + + case _ => false + } } private[avro] object AvroFileFormat { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 64127af..3947d32 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -70,6 +70,8 @@ object SchemaConverters { case ENUM => SchemaType(StringType, nullable = false) + case NULL => SchemaType(NullType, nullable = true) + case RECORD => if (existingRecordNames.contains(avroSchema.getFullName)) { throw new IncompatibleSchemaException(s""" @@ -151,6 +153,7 @@ object SchemaConverters { case FloatType => builder.floatType() case DoubleType => builder.doubleType() case StringType => builder.stringType() + case NullType => builder.nullType() case d: DecimalType => val avroType = LogicalTypes.decimal(d.precision, d.scale) val fixedSize = minBytesForPrecision(d.precision) @@ -181,7 +184,7 @@ object SchemaConverters { // This should never happen. case other => throw new IncompatibleSchemaException(s"Unexpected type $other.") } - if (nullable) { + if (nullable && catalystType != NullType) { Schema.createUnion(schema, nullSchema) } else { schema diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 207c54c..d803537 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -21,7 +21,7 @@ import java.io._ import java.net.URL import java.nio.file.{Files, Paths} import java.sql.{Date, Timestamp} -import java.util.{TimeZone, UUID} +import java.util.{Locale, TimeZone, UUID} import scala.collection.JavaConverters._ @@ -35,6 +35,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} @@ -135,27 +136,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("test NULL avro type") { - withTempPath { dir => - val fields = - Seq(new Field("null", Schema.create(Type.NULL), "doc", null.asInstanceOf[AnyVal])).asJava - val schema = Schema.createRecord("name", "docs", "namespace", false) - schema.setFields(fields) - val datumWriter = new GenericDatumWriter[GenericRecord](schema) - val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) - dataFileWriter.create(schema, new File(s"$dir.avro")) - val avroRec = new GenericData.Record(schema) - avroRec.put("null", null) - dataFileWriter.append(avroRec) - dataFileWriter.flush() - dataFileWriter.close() - - intercept[IncompatibleSchemaException] { - spark.read.format("avro").load(s"$dir.avro") - } - } - } - test("union(int, long) is read as long") { withTempPath { dir => val avroSchema: Schema = { @@ -903,6 +883,32 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("error handling for unsupported Interval data types") { + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + var msg = intercept[AnalysisException] { + sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir) + }.getMessage + assert(msg.contains("Cannot save interval data type into external storage.")) + + msg = intercept[AnalysisException] { + spark.udf.register("testType", () => new IntervalData()) + sql("select testType()").write.format("avro").mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"data source does not support calendarinterval data type.")) + } + } + + test("support Null data types") { + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + val df = sql("select null") + df.write.format("avro").mode("overwrite").save(tempDir) + checkAnswer(spark.read.format("avro").load(tempDir), df) + } + } + test("throw exception if unable to write with user provided Avro schema") { val input: Seq[(DataType, Schema.Type)] = Seq( (NullType, NULL), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org