This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8c87f03297 [SPARK-43380][SQL] Fix Avro data type conversion issues 
without causing performance regression
f8c87f03297 is described below

commit f8c87f03297e2770e2944e8e8fe097b75f9e8fea
Author: zeruibao <zerui....@databricks.com>
AuthorDate: Wed Sep 27 16:42:35 2023 +0800

    [SPARK-43380][SQL] Fix Avro data type conversion issues without causing 
performance regression
    
    ### What changes were proposed in this pull request?
    My last PR https://github.com/apache/spark/pull/41052 causes AVRO read 
performance regression since I change the code structure. I turn one match case 
into a nested match case. So I fix the Avro data type conversion issues in 
anther way to avoid this regression.
    
    Original Change:
    We introduce the SQLConf 
`spark.sql.legacy.avro.allowReadingWithIncompatibleSchema` to prevent reading 
interval types as date or timestamp types to avoid getting corrupt dates as 
well as reading decimal types with incorrect precision.
    
    ### Why are the changes needed?
    We found the following issues with open source Avro:
    
    - Interval types can be read as date or timestamp types that would lead to 
wildly different results
       For example, `Duration.ofDays(1).plusSeconds(1)` will be read as 
`1972-09-27`, which is weird.
    - Decimal types can be read with lower precision, that leads to data being 
read as `null` instead of suggesting that a wider decimal format should be 
provided
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Old unit test
    
    Closes #42503 from zeruibao/SPARK-4380-real-fix-regression.
    
    Lead-authored-by: zeruibao <zerui....@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |   5 +
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  46 ++++--
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 158 +++++++++++++++++++++
 docs/sql-error-conditions.md                       |   8 +-
 docs/sql-migration-guide.md                        |   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  16 +++
 .../org/apache/spark/sql/internal/SQLConf.scala    |  12 ++
 7 files changed, 235 insertions(+), 11 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 5d827c67482..dd0190c3462 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -75,6 +75,11 @@
       }
     }
   },
+  "AVRO_INCOMPATIBLE_READ_TYPE" : {
+    "message" : [
+      "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original 
encoded data type is <avroType>, however you're trying to read the field as 
<sqlType>, which would lead to an incorrect answer. To allow reading this 
field, enable the SQL configuration: 
\"spark.sql.legacy.avro.allowIncompatibleSchema\"."
+    ]
+  },
   "BATCH_METADATA_NOT_FOUND" : {
     "message" : [
       "Unable to find batch <batchMetadataFile>."
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index a78ee89a3e9..e82116eec1e 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -35,8 +35,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
-import org.apache.spark.sql.internal.LegacyBehaviorPolicy
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -117,6 +118,10 @@ private[sql] class AvroDeserializer(
     val incompatibleMsg = errorPrefix +
         s"schema is incompatible (avroType = $avroType, sqlType = 
${catalystType.sql})"
 
+    val realDataType = SchemaConverters.toSqlType(avroType).dataType
+    val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
+    val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
+
     (avroType.getType, catalystType) match {
       case (NULL, NullType) => (updater, ordinal, _) =>
         updater.setNullAt(ordinal)
@@ -128,9 +133,19 @@ private[sql] class AvroDeserializer(
       case (INT, IntegerType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[Int])
 
+      case (INT, dt: DatetimeType)
+        if preventReadingIncorrectType && 
realDataType.isInstanceOf[YearMonthIntervalType] =>
+        throw 
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+          toFieldStr(catalystPath), realDataType.catalogString, 
dt.catalogString)
+
       case (INT, DateType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
 
+      case (LONG, dt: DatetimeType)
+        if preventReadingIncorrectType && 
realDataType.isInstanceOf[DayTimeIntervalType] =>
+        throw 
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+          toFieldStr(catalystPath), realDataType.catalogString, 
dt.catalogString)
+
       case (LONG, LongType) => (updater, ordinal, value) =>
         updater.setLong(ordinal, value.asInstanceOf[Long])
 
@@ -204,17 +219,30 @@ private[sql] class AvroDeserializer(
         }
         updater.set(ordinal, bytes)
 
-      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+      case (FIXED, dt: DecimalType) =>
         val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-        val bigDecimal = 
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
-        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
-        updater.setDecimal(ordinal, decimal)
+        if (preventReadingIncorrectType &&
+          d.getPrecision - d.getScale > dt.precision - dt.scale) {
+          throw 
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+            toFieldStr(catalystPath), realDataType.catalogString, 
dt.catalogString)
+        }
+        (updater, ordinal, value) =>
+          val bigDecimal =
+            decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], 
avroType, d)
+          val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+          updater.setDecimal(ordinal, decimal)
 
-      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
+      case (BYTES, dt: DecimalType) =>
         val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-        val bigDecimal = 
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
-        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
-        updater.setDecimal(ordinal, decimal)
+        if (preventReadingIncorrectType &&
+          d.getPrecision - d.getScale > dt.precision - dt.scale) {
+          throw 
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+            toFieldStr(catalystPath), realDataType.catalogString, 
dt.catalogString)
+        }
+        (updater, ordinal, value) =>
+          val bigDecimal = 
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+          val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+          updater.setDecimal(ordinal, decimal)
 
       case (RECORD, st: StructType) =>
         // Avro datasource doesn't accept filters with nested attributes. See 
SPARK-32328.
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d22a2d36975..ffb0a49641b 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -32,6 +32,7 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
 import org.apache.avro.generic.{GenericData, GenericDatumReader, 
GenericDatumWriter, GenericRecord}
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, 
SparkUpgradeException}
 import org.apache.spark.TestUtils.assertExceptionMsg
@@ -814,6 +815,163 @@ abstract class AvroSuite
     }
   }
 
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of decimal type to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+      sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
+      // With the flag disabled, we will throw an exception if there is a 
mismatch
+      withSQLConf(confKey -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.schema("a DECIMAL(4, 
3)").format("avro").load(path.toString).collect()
+        }
+        ExceptionUtils.getRootCause(e) match {
+          case ex: AnalysisException =>
+            checkError(
+              exception = ex,
+              errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+              parameters = Map("avroPath" -> "field 'a'",
+                "sqlPath" -> "field 'a'",
+                "avroType" -> "decimal\\(12,10\\)",
+                "sqlType" -> "\"DECIMAL\\(4,3\\)\""),
+              matchPVals = true
+            )
+          case other =>
+            fail(s"Received unexpected exception", other)
+        }
+      }
+      // The following used to work, so it should still work with the flag 
enabled
+      checkAnswer(
+        spark.read.schema("a DECIMAL(5, 
3)").format("avro").load(path.toString),
+        Row(new java.math.BigDecimal("13.123"))
+      )
+      withSQLConf(confKey -> "true") {
+        // With the flag enabled, we return a null silently, which isn't great
+        checkAnswer(
+          spark.read.schema("a DECIMAL(4, 
3)").format("avro").load(path.toString),
+          Row(null)
+        )
+        checkAnswer(
+          spark.read.schema("a DECIMAL(5, 
3)").format("avro").load(path.toString),
+          Row(new java.math.BigDecimal("13.123"))
+        )
+      }
+    }
+  }
+
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of DayTimeIntervalType to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+      val schema = StructType(Array(StructField("a", DayTimeIntervalType(), 
false)))
+      val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
+
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.format("avro").save(path.getCanonicalPath)
+
+      withSQLConf(confKey -> "false") {
+        Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+          val e = intercept[SparkException] {
+            spark.read.schema(s"a 
$sqlType").format("avro").load(path.toString).collect()
+          }
+
+          ExceptionUtils.getRootCause(e) match {
+            case ex: AnalysisException =>
+              checkError(
+                exception = ex,
+                errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+                parameters = Map("avroPath" -> "field 'a'",
+                  "sqlPath" -> "field 'a'",
+                  "avroType" -> "interval day to second",
+                  "sqlType" -> s""""$sqlType""""),
+                matchPVals = true
+              )
+            case other =>
+              fail(s"Received unexpected exception", other)
+          }
+        }
+      }
+
+      withSQLConf(confKey -> "true") {
+        // Allow conversion and do not need to check result
+        spark.read.schema("a Date").format("avro").load(path.toString)
+        spark.read.schema("a timestamp").format("avro").load(path.toString)
+        spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+      }
+    }
+  }
+
+  test("SPARK-43380: Fix Avro data type conversion" +
+    " of YearMonthIntervalType to avoid producing incorrect results") {
+    withTempPath { path =>
+      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+      val schema = StructType(Array(StructField("a", YearMonthIntervalType(), 
false)))
+      val data = Seq(Row(java.time.Period.of(1, 1, 0)))
+
+      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+      df.write.format("avro").save(path.getCanonicalPath)
+
+      withSQLConf(confKey -> "false") {
+        Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+          val e = intercept[SparkException] {
+            spark.read.schema(s"a 
$sqlType").format("avro").load(path.toString).collect()
+          }
+
+          ExceptionUtils.getRootCause(e) match {
+            case ex: AnalysisException =>
+              checkError(
+                exception = ex,
+                errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+                parameters = Map("avroPath" -> "field 'a'",
+                  "sqlPath" -> "field 'a'",
+                  "avroType" -> "interval year to month",
+                  "sqlType" -> s""""$sqlType""""),
+                matchPVals = true
+              )
+            case other =>
+              fail(s"Received unexpected exception", other)
+          }
+        }
+      }
+
+      withSQLConf(confKey -> "true") {
+        // Allow conversion and do not need to check result
+        spark.read.schema("a Date").format("avro").load(path.toString)
+        spark.read.schema("a timestamp").format("avro").load(path.toString)
+        spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+      }
+    }
+  }
+
+  Seq(
+    "time-millis",
+    "time-micros",
+    "timestamp-micros",
+    "timestamp-millis",
+    "local-timestamp-millis",
+    "local-timestamp-micros"
+  ).foreach { timeLogicalType =>
+    test(s"converting $timeLogicalType type to long in avro") {
+      withTempPath { path =>
+        val df = Seq(100L)
+          .toDF("dt")
+        val avroSchema =
+          s"""
+             |{
+             |  "type" : "record",
+             |  "name" : "test_schema",
+             |  "fields" : [
+             |    {"name": "dt", "type": {"type": "long", "logicalType": 
"$timeLogicalType"}}
+             |  ]
+             |}""".stripMargin
+        df.write.format("avro").option("avroSchema", 
avroSchema).save(path.getCanonicalPath)
+        checkAnswer(
+          spark.read.schema(s"dt long").format("avro").load(path.toString),
+          Row(100L))
+      }
+    }
+  }
+
   test("converting some specific sparkSQL types to avro") {
     withTempPath { tempDir =>
       val testSchema = StructType(Seq(
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 38cfc28ba09..660a72dca7d 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -93,6 +93,12 @@ Invalid as-of join.
 
 For more details see 
[AS_OF_JOIN](sql-error-conditions-as-of-join-error-class.html)
 
+### AVRO_INCOMPATIBLE_READ_TYPE
+
+SQLSTATE: none assigned
+
+Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original 
encoded data type is `<avroType>`, however you're trying to read the field as 
`<sqlType>`, which would lead to an incorrect answer. To allow reading this 
field, enable the SQL configuration: 
"spark.sql.legacy.avro.allowIncompatibleSchema".
+
 ### BATCH_METADATA_NOT_FOUND
 
 [SQLSTATE: 
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -2210,5 +2216,3 @@ The operation `<operation>` requires a `<requiredType>`. 
But `<objectName>` is a
 The `<functionName>` requires `<expectedNum>` parameters but the actual number 
is `<actualNum>`.
 
 For more details see 
[WRONG_NUM_ARGS](sql-error-conditions-wrong-num-args-error-class.html)
-
-
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index a28f6fd284d..c5d09c19b24 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -36,6 +36,7 @@ license: |
 - Since Spark 3.5, the `plan` field is moved from `AnalysisException` to 
`EnhancedAnalysisException`.
 - Since Spark 3.5, `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` 
is enabled by default. To restore the previous behavior, set 
`spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` to `false`.
 - Since Spark 3.5, the `array_insert` function is 1-based for negative 
indexes. It inserts new element at the end of input arrays for the index -1. To 
restore the previous behavior, set 
`spark.sql.legacy.negativeIndexInArrayInsert` to `true`.
+- Since Spark 3.5, the Avro will throw `AnalysisException` when reading 
Interval types as Date or Timestamp types, or reading Decimal types with lower 
precision. To restore the legacy behavior, set 
`spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
 
 ## Upgrading from Spark SQL 3.3 to 3.4
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9d2b1225825..2f2341ab47f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3727,6 +3727,22 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
     )
   }
 
+  def avroIncompatibleReadError(
+      avroPath: String,
+      sqlPath: String,
+      avroType: String,
+      sqlType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+      messageParameters = Map(
+        "avroPath" -> avroPath,
+        "sqlPath" -> sqlPath,
+        "avroType" -> avroType,
+        "sqlType" -> toSQLType(sqlType)
+      )
+    )
+  }
+
   def optionMustBeLiteralString(key: String): Throwable = {
     new AnalysisException(
       errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index aeef531dbcd..ed887fd5cb2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4332,6 +4332,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
+    buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
+      .internal()
+      .doc("When set to false, if types in Avro are encoded in the same 
format, but " +
+        "the type in the Avro schema explicitly says that the data types are 
different, " +
+        "reject reading the data type in the format to avoid returning 
incorrect results. " +
+        "When set to true, it restores the legacy behavior of allow reading 
the data in the" +
+        " format, which may return incorrect results.")
+      .version("3.5.1")
+      .booleanConf
+      .createWithDefault(false)
+
   val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
     buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to