This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0e5812c49d2 [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas 0e5812c49d2 is described below commit 0e5812c49d2552d8779f94fbaad2fc1b69d8a9e8 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Fri Aug 5 11:25:51 2022 +0800 [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas ### What changes were proposed in this pull request? This PR disables validate default values when parsing Avro schemas. ### Why are the changes needed? Spark will throw exception if upgrade to Spark 3.2. We have fixed the Hive serde tables before: SPARK-34512. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37191 from wangyum/SPARK-39775. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 5c1b99f441ec5e178290637a9a9e7902aaa116e1) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/serializer/GenericAvroSerializer.scala | 4 +-- .../serializer/GenericAvroSerializerSuite.scala | 16 +++++++++++ .../apache/spark/sql/avro/AvroDataToCatalyst.scala | 3 +- .../org/apache/spark/sql/avro/AvroOptions.scala | 4 +-- .../apache/spark/sql/avro/CatalystDataToAvro.scala | 2 +- .../apache/spark/sql/avro/AvroFunctionsSuite.scala | 32 ++++++++++++++++++++++ 6 files changed, 55 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index c1ef3ee769a..7d2923fdf37 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -97,7 +97,7 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer] } { in.close() } - new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8)) + new Schema.Parser().setValidateDefaults(false).parse(new String(bytes, StandardCharsets.UTF_8)) }) /** @@ -137,7 +137,7 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer] val fingerprint = input.readLong() schemaCache.getOrElseUpdate(fingerprint, { schemas.get(fingerprint) match { - case Some(s) => new Schema.Parser().parse(s) + case Some(s) => new Schema.Parser().setValidateDefaults(false).parse(s) case None => throw new SparkException( "Error reading attempting to read avro data -- encountered an unknown " + diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 54e4aebe544..98493c12f59 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -110,4 +110,20 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(rdd.collect() sameElements Array.fill(10)(datum)) } } + + test("SPARK-39775: Disable validate default values when parsing Avro schemas") { + val avroTypeStruct = s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + | {"name": "id", "type": "long", "default": null} + | ] + |} + """.stripMargin + val schema = new Schema.Parser().setValidateDefaults(false).parse(avroTypeStruct) + + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) + assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) + } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index b4965003ba3..c4a4b16b052 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -53,7 +53,8 @@ private[avro] case class AvroDataToCatalyst( private lazy val avroOptions = AvroOptions(options) - @transient private lazy val actualSchema = new Schema.Parser().parse(jsonFormatSchema) + @transient private lazy val actualSchema = + new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema) @transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 9fe50079b24..a505a1656fc 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -52,13 +52,13 @@ private[sql] class AvroOptions( * instead of "string" type in the default converted schema. */ val schema: Option[Schema] = { - parameters.get("avroSchema").map(new Schema.Parser().parse).orElse({ + parameters.get("avroSchema").map(new Schema.Parser().setValidateDefaults(false).parse).orElse({ val avroUrlSchema = parameters.get("avroSchemaUrl").map(url => { log.debug("loading avro schema from url: " + url) val fs = FileSystem.get(new URI(url), conf) val in = fs.open(new Path(url)) try { - new Schema.Parser().parse(in) + new Schema.Parser().setValidateDefaults(false).parse(in) } finally { in.close() } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala index 5d79c44ad42..1e7e8600977 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala @@ -35,7 +35,7 @@ private[avro] case class CatalystDataToAvro( @transient private lazy val avroType = jsonFormatSchema - .map(new Schema.Parser().parse) + .map(new Schema.Parser().setValidateDefaults(false).parse) .getOrElse(SchemaConverters.toAvroType(child.dataType, child.nullable)) @transient private lazy val serializer = diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index c9e0d434469..69cda3efb52 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions.{col, lit, struct} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType class AvroFunctionsSuite extends QueryTest with SharedSparkSession { import testImplicits._ @@ -238,4 +239,35 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { assert(message.contains("Only UNION of a null type and a non-null type is supported")) } } + + test("SPARK-39775: Disable validate default values when parsing Avro schemas") { + val avroTypeStruct = s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + | {"name": "id", "type": "long", "default": null} + | ] + |} + """.stripMargin + val avroSchema = AvroOptions(Map("avroSchema" -> avroTypeStruct)).schema.get + val sparkSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] + + val df = spark.range(5).select($"id") + val structDf = df.select(struct($"id").as("struct")) + val avroStructDF = structDf.select(functions.to_avro('struct, avroTypeStruct).as("avro")) + checkAnswer(avroStructDF.select(functions.from_avro('avro, avroTypeStruct)), structDf) + + withTempPath { dir => + df.write.format("avro").save(dir.getCanonicalPath) + checkAnswer(spark.read.schema(sparkSchema).format("avro").load(dir.getCanonicalPath), df) + + val msg = intercept[SparkException] { + spark.read.option("avroSchema", avroTypeStruct).format("avro") + .load(dir.getCanonicalPath) + .collect() + }.getCause.getMessage + assert(msg.contains("Invalid default for field id: null not a \"long\"")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org