This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f8c87f03297 [SPARK-43380][SQL] Fix Avro data type conversion issues without causing performance regression f8c87f03297 is described below commit f8c87f03297e2770e2944e8e8fe097b75f9e8fea Author: zeruibao <zerui....@databricks.com> AuthorDate: Wed Sep 27 16:42:35 2023 +0800 [SPARK-43380][SQL] Fix Avro data type conversion issues without causing performance regression ### What changes were proposed in this pull request? My last PR https://github.com/apache/spark/pull/41052 causes AVRO read performance regression since I change the code structure. I turn one match case into a nested match case. So I fix the Avro data type conversion issues in anther way to avoid this regression. Original Change: We introduce the SQLConf `spark.sql.legacy.avro.allowReadingWithIncompatibleSchema` to prevent reading interval types as date or timestamp types to avoid getting corrupt dates as well as reading decimal types with incorrect precision. ### Why are the changes needed? We found the following issues with open source Avro: - Interval types can be read as date or timestamp types that would lead to wildly different results For example, `Duration.ofDays(1).plusSeconds(1)` will be read as `1972-09-27`, which is weird. - Decimal types can be read with lower precision, that leads to data being read as `null` instead of suggesting that a wider decimal format should be provided ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Old unit test Closes #42503 from zeruibao/SPARK-4380-real-fix-regression. Lead-authored-by: zeruibao <zerui....@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/resources/error/error-classes.json | 5 + .../apache/spark/sql/avro/AvroDeserializer.scala | 46 ++++-- .../org/apache/spark/sql/avro/AvroSuite.scala | 158 +++++++++++++++++++++ docs/sql-error-conditions.md | 8 +- docs/sql-migration-guide.md | 1 + .../spark/sql/errors/QueryCompilationErrors.scala | 16 +++ .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++ 7 files changed, 235 insertions(+), 11 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 5d827c67482..dd0190c3462 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -75,6 +75,11 @@ } } }, + "AVRO_INCOMPATIBLE_READ_TYPE" : { + "message" : [ + "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: \"spark.sql.legacy.avro.allowIncompatibleSchema\"." + ] + }, "BATCH_METADATA_NOT_FOUND" : { "message" : [ "Unable to find batch <batchMetadataFile>." diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index a78ee89a3e9..e82116eec1e 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -35,8 +35,9 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.internal.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -117,6 +118,10 @@ private[sql] class AvroDeserializer( val incompatibleMsg = errorPrefix + s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})" + val realDataType = SchemaConverters.toSqlType(avroType).dataType + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA + val preventReadingIncorrectType = !SQLConf.get.getConf(confKey) + (avroType.getType, catalystType) match { case (NULL, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal) @@ -128,9 +133,19 @@ private[sql] class AvroDeserializer( case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) + case (INT, dt: DatetimeType) + if preventReadingIncorrectType && realDataType.isInstanceOf[YearMonthIntervalType] => + throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), + toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString) + case (INT, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + case (LONG, dt: DatetimeType) + if preventReadingIncorrectType && realDataType.isInstanceOf[DayTimeIntervalType] => + throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), + toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString) + case (LONG, LongType) => (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long]) @@ -204,17 +219,30 @@ private[sql] class AvroDeserializer( } updater.set(ordinal, bytes) - case (FIXED, _: DecimalType) => (updater, ordinal, value) => + case (FIXED, dt: DecimalType) => val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d) - val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) - updater.setDecimal(ordinal, decimal) + if (preventReadingIncorrectType && + d.getPrecision - d.getScale > dt.precision - dt.scale) { + throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), + toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString) + } + (updater, ordinal, value) => + val bigDecimal = + decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d) + val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) + updater.setDecimal(ordinal, decimal) - case (BYTES, _: DecimalType) => (updater, ordinal, value) => + case (BYTES, dt: DecimalType) => val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d) - val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) - updater.setDecimal(ordinal, decimal) + if (preventReadingIncorrectType && + d.getPrecision - d.getScale > dt.precision - dt.scale) { + throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), + toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString) + } + (updater, ordinal, value) => + val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d) + val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) + updater.setDecimal(ordinal, decimal) case (RECORD, st: StructType) => // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index d22a2d36975..ffb0a49641b 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -32,6 +32,7 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.TestUtils.assertExceptionMsg @@ -814,6 +815,163 @@ abstract class AvroSuite } } + test("SPARK-43380: Fix Avro data type conversion" + + " of decimal type to avoid producing incorrect results") { + withTempPath { path => + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString) + // With the flag disabled, we will throw an exception if there is a mismatch + withSQLConf(confKey -> "false") { + val e = intercept[SparkException] { + spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect() + } + ExceptionUtils.getRootCause(e) match { + case ex: AnalysisException => + checkError( + exception = ex, + errorClass = "AVRO_INCOMPATIBLE_READ_TYPE", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "decimal\\(12,10\\)", + "sqlType" -> "\"DECIMAL\\(4,3\\)\""), + matchPVals = true + ) + case other => + fail(s"Received unexpected exception", other) + } + } + // The following used to work, so it should still work with the flag enabled + checkAnswer( + spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), + Row(new java.math.BigDecimal("13.123")) + ) + withSQLConf(confKey -> "true") { + // With the flag enabled, we return a null silently, which isn't great + checkAnswer( + spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString), + Row(null) + ) + checkAnswer( + spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), + Row(new java.math.BigDecimal("13.123")) + ) + } + } + } + + test("SPARK-43380: Fix Avro data type conversion" + + " of DayTimeIntervalType to avoid producing incorrect results") { + withTempPath { path => + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + val schema = StructType(Array(StructField("a", DayTimeIntervalType(), false))) + val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1))) + + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.format("avro").save(path.getCanonicalPath) + + withSQLConf(confKey -> "false") { + Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType => + val e = intercept[SparkException] { + spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect() + } + + ExceptionUtils.getRootCause(e) match { + case ex: AnalysisException => + checkError( + exception = ex, + errorClass = "AVRO_INCOMPATIBLE_READ_TYPE", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "interval day to second", + "sqlType" -> s""""$sqlType""""), + matchPVals = true + ) + case other => + fail(s"Received unexpected exception", other) + } + } + } + + withSQLConf(confKey -> "true") { + // Allow conversion and do not need to check result + spark.read.schema("a Date").format("avro").load(path.toString) + spark.read.schema("a timestamp").format("avro").load(path.toString) + spark.read.schema("a timestamp_ntz").format("avro").load(path.toString) + } + } + } + + test("SPARK-43380: Fix Avro data type conversion" + + " of YearMonthIntervalType to avoid producing incorrect results") { + withTempPath { path => + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + val schema = StructType(Array(StructField("a", YearMonthIntervalType(), false))) + val data = Seq(Row(java.time.Period.of(1, 1, 0))) + + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.format("avro").save(path.getCanonicalPath) + + withSQLConf(confKey -> "false") { + Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType => + val e = intercept[SparkException] { + spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect() + } + + ExceptionUtils.getRootCause(e) match { + case ex: AnalysisException => + checkError( + exception = ex, + errorClass = "AVRO_INCOMPATIBLE_READ_TYPE", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "interval year to month", + "sqlType" -> s""""$sqlType""""), + matchPVals = true + ) + case other => + fail(s"Received unexpected exception", other) + } + } + } + + withSQLConf(confKey -> "true") { + // Allow conversion and do not need to check result + spark.read.schema("a Date").format("avro").load(path.toString) + spark.read.schema("a timestamp").format("avro").load(path.toString) + spark.read.schema("a timestamp_ntz").format("avro").load(path.toString) + } + } + } + + Seq( + "time-millis", + "time-micros", + "timestamp-micros", + "timestamp-millis", + "local-timestamp-millis", + "local-timestamp-micros" + ).foreach { timeLogicalType => + test(s"converting $timeLogicalType type to long in avro") { + withTempPath { path => + val df = Seq(100L) + .toDF("dt") + val avroSchema = + s""" + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "dt", "type": {"type": "long", "logicalType": "$timeLogicalType"}} + | ] + |}""".stripMargin + df.write.format("avro").option("avroSchema", avroSchema).save(path.getCanonicalPath) + checkAnswer( + spark.read.schema(s"dt long").format("avro").load(path.toString), + Row(100L)) + } + } + } + test("converting some specific sparkSQL types to avro") { withTempPath { tempDir => val testSchema = StructType(Seq( diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 38cfc28ba09..660a72dca7d 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -93,6 +93,12 @@ Invalid as-of join. For more details see [AS_OF_JOIN](sql-error-conditions-as-of-join-error-class.html) +### AVRO_INCOMPATIBLE_READ_TYPE + +SQLSTATE: none assigned + +Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original encoded data type is `<avroType>`, however you're trying to read the field as `<sqlType>`, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: "spark.sql.legacy.avro.allowIncompatibleSchema". + ### BATCH_METADATA_NOT_FOUND [SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) @@ -2210,5 +2216,3 @@ The operation `<operation>` requires a `<requiredType>`. But `<objectName>` is a The `<functionName>` requires `<expectedNum>` parameters but the actual number is `<actualNum>`. For more details see [WRONG_NUM_ARGS](sql-error-conditions-wrong-num-args-error-class.html) - - diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index a28f6fd284d..c5d09c19b24 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -36,6 +36,7 @@ license: | - Since Spark 3.5, the `plan` field is moved from `AnalysisException` to `EnhancedAnalysisException`. - Since Spark 3.5, `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` is enabled by default. To restore the previous behavior, set `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` to `false`. - Since Spark 3.5, the `array_insert` function is 1-based for negative indexes. It inserts new element at the end of input arrays for the index -1. To restore the previous behavior, set `spark.sql.legacy.negativeIndexInArrayInsert` to `true`. +- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true` ## Upgrading from Spark SQL 3.3 to 3.4 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 9d2b1225825..2f2341ab47f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3727,6 +3727,22 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } + def avroIncompatibleReadError( + avroPath: String, + sqlPath: String, + avroType: String, + sqlType: String): Throwable = { + new AnalysisException( + errorClass = "AVRO_INCOMPATIBLE_READ_TYPE", + messageParameters = Map( + "avroPath" -> avroPath, + "sqlPath" -> sqlPath, + "avroType" -> avroType, + "sqlType" -> toSQLType(sqlType) + ) + ) + } + def optionMustBeLiteralString(key: String): Throwable = { new AnalysisException( errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index aeef531dbcd..ed887fd5cb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4332,6 +4332,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA = + buildConf("spark.sql.legacy.avro.allowIncompatibleSchema") + .internal() + .doc("When set to false, if types in Avro are encoded in the same format, but " + + "the type in the Avro schema explicitly says that the data types are different, " + + "reject reading the data type in the format to avoid returning incorrect results. " + + "When set to true, it restores the legacy behavior of allow reading the data in the" + + " format, which may return incorrect results.") + .version("3.5.1") + .booleanConf + .createWithDefault(false) + val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME = buildConf("spark.sql.legacy.v1IdentifierNoCatalog") .internal() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org