This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 46580ab4cb0 [SPARK-43380][SQL] Revert `Fix Avro data type conversion 
issues`
46580ab4cb0 is described below

commit 46580ab4cb02390ba71dace1235015749f048fff
Author: zeruibao <zerui....@databricks.com>
AuthorDate: Mon Aug 14 19:36:40 2023 -0700

    [SPARK-43380][SQL] Revert `Fix Avro data type conversion issues`
    
    ### What changes were proposed in this pull request?
    Revert my last PR https://github.com/apache/spark/pull/41052 that causes 
AVRO read performance regression since I change the code structure.
    
    ### Why are the changes needed?
    Remove performance regression
    
    ### How was this patch tested?
    Unit test
    
    Closes #42458 from zeruibao/revert-avro-change.
    
    Authored-by: zeruibao <zerui....@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../src/main/resources/error/error-classes.json    |  10 -
 .../apache/spark/sql/avro/AvroDeserializer.scala   | 456 +++++++++------------
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 161 --------
 docs/sql-error-conditions.md                       |  14 -
 docs/sql-migration-guide.md                        |   1 -
 .../spark/sql/errors/QueryCompilationErrors.scala  |  30 --
 .../org/apache/spark/sql/internal/SQLConf.scala    |  12 -
 7 files changed, 189 insertions(+), 495 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 08f79bcecbb..6ce24bc3b90 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -75,16 +75,6 @@
       }
     }
   },
-  "AVRO_INCORRECT_TYPE" : {
-    "message" : [
-      "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original 
encoded data type is <avroType>, however you're trying to read the field as 
<sqlType>, which would lead to an incorrect answer. To allow reading this 
field, enable the SQL configuration: <key>."
-    ]
-  },
-  "AVRO_LOWER_PRECISION" : {
-    "message" : [
-      "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original 
encoded data type is <avroType>, however you're trying to read the field as 
<sqlType>, which leads to data being read as null. Please provide a wider 
decimal type to get the correct result. To allow reading null to this field, 
enable the SQL configuration: <key>."
-    ]
-  },
   "BATCH_METADATA_NOT_FOUND" : {
     "message" : [
       "Unable to find batch <batchMetadataFile>."
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index d4d34a891e9..a78ee89a3e9 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -35,9 +35,8 @@ import 
org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -118,268 +117,178 @@ private[sql] class AvroDeserializer(
     val incompatibleMsg = errorPrefix +
         s"schema is incompatible (avroType = $avroType, sqlType = 
${catalystType.sql})"
 
-    val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
-    val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
+    (avroType.getType, catalystType) match {
+      case (NULL, NullType) => (updater, ordinal, _) =>
+        updater.setNullAt(ordinal)
 
-    val logicalDataType = SchemaConverters.toSqlType(avroType).dataType
-    avroType.getType match {
-      case NULL =>
-        (logicalDataType, catalystType) match {
-          case (_, NullType) => (updater, ordinal, _) =>
-            updater.setNullAt(ordinal)
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
       // TODO: we can avoid boxing if future version of avro provide primitive 
accessors.
-      case BOOLEAN =>
-        (logicalDataType, catalystType) match {
-          case (_, BooleanType) => (updater, ordinal, value) =>
-            updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
+      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
+        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+
+      case (INT, IntegerType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, value.asInstanceOf[Int])
+
+      case (INT, DateType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+
+      case (LONG, LongType) => (updater, ordinal, value) =>
+        updater.setLong(ordinal, value.asInstanceOf[Long])
+
+      case (LONG, TimestampType) => avroType.getLogicalType match {
+        // For backward compatibility, if the Avro type is Long and it is not 
logical type
+        // (the `null` case), the value is processed as timestamp type with 
millisecond precision.
+        case null | _: TimestampMillis => (updater, ordinal, value) =>
+          val millis = value.asInstanceOf[Long]
+          val micros = DateTimeUtils.millisToMicros(millis)
+          updater.setLong(ordinal, timestampRebaseFunc(micros))
+        case _: TimestampMicros => (updater, ordinal, value) =>
+          val micros = value.asInstanceOf[Long]
+          updater.setLong(ordinal, timestampRebaseFunc(micros))
+        case other => throw new IncompatibleSchemaException(errorPrefix +
+          s"Avro logical type $other cannot be converted to SQL type 
${TimestampType.sql}.")
+      }
 
-      case INT =>
-        (logicalDataType, catalystType) match {
-          case (IntegerType, IntegerType) => (updater, ordinal, value) =>
-            updater.setInt(ordinal, value.asInstanceOf[Int])
-          case (IntegerType, DateType) => (updater, ordinal, value) =>
-            updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
-          case (DateType, DateType) => (updater, ordinal, value) =>
-            updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
-          case (_: YearMonthIntervalType, _: YearMonthIntervalType) => 
(updater, ordinal, value) =>
-            updater.setInt(ordinal, value.asInstanceOf[Int])
-          case (_: YearMonthIntervalType, _) if preventReadingIncorrectType =>
-            throw QueryCompilationErrors.avroIncorrectTypeError(
-              toFieldStr(avroPath), toFieldStr(catalystPath),
-              logicalDataType.catalogString, catalystType.catalogString, 
confKey.key)
-          case _ if !preventReadingIncorrectType => (updater, ordinal, value) 
=>
-            updater.setInt(ordinal, value.asInstanceOf[Int])
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case LONG =>
-        (logicalDataType, catalystType) match {
-          case (LongType, LongType) => (updater, ordinal, value) =>
-            updater.setLong(ordinal, value.asInstanceOf[Long])
-          case (TimestampType, LongType) => (updater, ordinal, value) =>
-            updater.setLong(ordinal, value.asInstanceOf[Long])
-          case (TimestampNTZType, LongType) => (updater, ordinal, value) =>
-            updater.setLong(ordinal, value.asInstanceOf[Long])
-          case (LongType, TimestampType)
-               | (TimestampType, TimestampType)
-               |(TimestampNTZType, TimestampType) => avroType.getLogicalType 
match {
-            // For backward compatibility, if the Avro type is Long and it is 
not logical type
-            // (the `null` case), the value is processed as timestamp type with
-            // millisecond precision.
-            case null | _: TimestampMillis => (updater, ordinal, value) =>
-              val millis = value.asInstanceOf[Long]
-              val micros = DateTimeUtils.millisToMicros(millis)
-              updater.setLong(ordinal, timestampRebaseFunc(micros))
-            case _: TimestampMicros => (updater, ordinal, value) =>
-              val micros = value.asInstanceOf[Long]
-              updater.setLong(ordinal, timestampRebaseFunc(micros))
-            case other => throw new IncompatibleSchemaException(errorPrefix +
-              s"Avro logical type $other cannot be converted to SQL type 
${TimestampType.sql}.")
-          }
-          case (LongType, TimestampNTZType)
-               | (TimestampNTZType, TimestampNTZType)
-               | (TimestampType, TimestampNTZType) => avroType.getLogicalType 
match {
-            // To keep consistent with TimestampType, if the Avro type is Long 
and it is not
-            // logical type (the `null` case), the value is processed as 
TimestampNTZ
-            // with millisecond precision.
-            case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
-              val millis = value.asInstanceOf[Long]
-              val micros = DateTimeUtils.millisToMicros(millis)
-              updater.setLong(ordinal, micros)
-            case _: LocalTimestampMicros => (updater, ordinal, value) =>
-              val micros = value.asInstanceOf[Long]
-              updater.setLong(ordinal, micros)
-            case other => throw new IncompatibleSchemaException(errorPrefix +
-              s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
-          }
-          // Before we upgrade Avro to 1.8 for logical type support,
-          // spark-avro converts Long to Date.
-          // For backward compatibility, we still keep this conversion.
-          case (LongType, DateType) => (updater, ordinal, value) =>
-            updater.setInt(ordinal, (value.asInstanceOf[Long] / 
MILLIS_PER_DAY).toInt)
-          case (DateType, DateType) => (updater, ordinal, value) =>
-            updater.setLong(ordinal, value.asInstanceOf[Long])
-          case (_: DayTimeIntervalType, _: DayTimeIntervalType) => (updater, 
ordinal, value) =>
-            updater.setLong(ordinal, value.asInstanceOf[Long])
-          case (_: DayTimeIntervalType, _) if preventReadingIncorrectType =>
-            throw QueryCompilationErrors.avroIncorrectTypeError(
-              toFieldStr(avroPath), toFieldStr(catalystPath),
-              logicalDataType.catalogString, catalystType.catalogString, 
confKey.key)
-          case (_: DayTimeIntervalType, DateType) => (updater, ordinal, value) 
=>
-            updater.setInt(ordinal, (value.asInstanceOf[Long] / 
MILLIS_PER_DAY).toInt)
-          case (_, dt: DecimalType) => (updater, ordinal, value) =>
-            val d = avroType.getLogicalType.asInstanceOf[CustomDecimal]
-            updater.setDecimal(ordinal, Decimal(value.asInstanceOf[Long], 
d.precision, d.scale))
-          case _ if !preventReadingIncorrectType => (updater, ordinal, value) 
=>
-            updater.setLong(ordinal, value.asInstanceOf[Long])
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case FLOAT =>
-        (logicalDataType, catalystType) match {
-          case (_, FloatType) => (updater, ordinal, value) =>
-            updater.setFloat(ordinal, value.asInstanceOf[Float])
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case DOUBLE =>
-        (logicalDataType, catalystType) match {
-          case (_, DoubleType) => (updater, ordinal, value) =>
-            updater.setDouble(ordinal, value.asInstanceOf[Double])
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case STRING =>
-        (logicalDataType, catalystType) match {
-          case (_, StringType) => (updater, ordinal, value) =>
-            val str = value match {
-              case s: String => UTF8String.fromString(s)
-              case s: Utf8 =>
-                val bytes = new Array[Byte](s.getByteLength)
-                System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
-                UTF8String.fromBytes(bytes)
-            }
-            updater.set(ordinal, str)
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case ENUM =>
-        (logicalDataType, catalystType) match {
-          case (_, StringType) => (updater, ordinal, value) =>
-            updater.set(ordinal, UTF8String.fromString(value.toString))
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case FIXED =>
-        (logicalDataType, catalystType) match {
-          case (_, BinaryType) => (updater, ordinal, value) =>
-            updater.set(ordinal, 
value.asInstanceOf[GenericFixed].bytes().clone())
-          case (_, dt: DecimalType) =>
-            val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-            if (preventReadingIncorrectType &&
-              d.getPrecision - d.getScale > dt.precision - dt.scale) {
-              throw 
QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath),
-                toFieldStr(catalystPath), logicalDataType.catalogString,
-                dt.catalogString, confKey.key)
-            }
-            (updater, ordinal, value) =>
-              val bigDecimal =
-                decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], 
avroType, d)
-              val decimal = createDecimal(bigDecimal, d.getPrecision, 
d.getScale)
-              updater.setDecimal(ordinal, decimal)
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case BYTES =>
-        (logicalDataType, catalystType) match {
-          case (_, BinaryType) => (updater, ordinal, value) =>
-            val bytes = value match {
-              case b: ByteBuffer =>
-                val bytes = new Array[Byte](b.remaining)
-                b.get(bytes)
-                // Do not forget to reset the position
-                b.rewind()
-                bytes
-              case b: Array[Byte] => b
-              case other =>
-                throw new RuntimeException(errorPrefix + s"$other is not a 
valid avro binary.")
-            }
-            updater.set(ordinal, bytes)
-          case (_, dt: DecimalType) =>
-            val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
-            if (preventReadingIncorrectType &&
-              d.getPrecision - d.getScale > dt.precision - dt.scale) {
-              throw 
QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath),
-                toFieldStr(catalystPath), logicalDataType.catalogString,
-                dt.catalogString, confKey.key)
-            }
-            (updater, ordinal, value) =>
-              val bigDecimal = decimalConversions
-                .fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
-              val decimal = createDecimal(bigDecimal, d.getPrecision, 
d.getScale)
-              updater.setDecimal(ordinal, decimal)
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+      case (LONG, TimestampNTZType) => avroType.getLogicalType match {
+        // To keep consistent with TimestampType, if the Avro type is Long and 
it is not
+        // logical type (the `null` case), the value is processed as 
TimestampNTZ
+        // with millisecond precision.
+        case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
+          val millis = value.asInstanceOf[Long]
+          val micros = DateTimeUtils.millisToMicros(millis)
+          updater.setLong(ordinal, micros)
+        case _: LocalTimestampMicros => (updater, ordinal, value) =>
+          val micros = value.asInstanceOf[Long]
+          updater.setLong(ordinal, micros)
+        case other => throw new IncompatibleSchemaException(errorPrefix +
+          s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
+      }
+
+      // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
+      // For backward compatibility, we still keep this conversion.
+      case (LONG, DateType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, (value.asInstanceOf[Long] / 
MILLIS_PER_DAY).toInt)
+
+      case (FLOAT, FloatType) => (updater, ordinal, value) =>
+        updater.setFloat(ordinal, value.asInstanceOf[Float])
+
+      case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
+        updater.setDouble(ordinal, value.asInstanceOf[Double])
+
+      case (STRING, StringType) => (updater, ordinal, value) =>
+        val str = value match {
+          case s: String => UTF8String.fromString(s)
+          case s: Utf8 =>
+            val bytes = new Array[Byte](s.getByteLength)
+            System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
+            UTF8String.fromBytes(bytes)
         }
-      case RECORD =>
-        (logicalDataType, catalystType) match {
-          case (_, st: StructType) =>
-            // Avro datasource doesn't accept filters with nested attributes. 
See SPARK-32328.
-            // We can always return `false` from `applyFilters` for nested 
records.
-            val writeRecord =
-              getRecordWriter(avroType, st, avroPath, catalystPath, 
applyFilters = _ => false)
-            (updater, ordinal, value) =>
-              val row = new SpecificInternalRow(st)
-              writeRecord(new RowUpdater(row), 
value.asInstanceOf[GenericRecord])
-              updater.set(ordinal, row)
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+        updater.set(ordinal, str)
+
+      case (ENUM, StringType) => (updater, ordinal, value) =>
+        updater.set(ordinal, UTF8String.fromString(value.toString))
+
+      case (FIXED, BinaryType) => (updater, ordinal, value) =>
+        updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
+
+      case (BYTES, BinaryType) => (updater, ordinal, value) =>
+        val bytes = value match {
+          case b: ByteBuffer =>
+            val bytes = new Array[Byte](b.remaining)
+            b.get(bytes)
+            // Do not forget to reset the position
+            b.rewind()
+            bytes
+          case b: Array[Byte] => b
+          case other =>
+            throw new RuntimeException(errorPrefix + s"$other is not a valid 
avro binary.")
         }
-      case ARRAY =>
-        (logicalDataType, catalystType) match {
-          case (_, ArrayType(elementType, containsNull)) =>
-            val avroElementPath = avroPath :+ "element"
-            val elementWriter = newWriter(avroType.getElementType, elementType,
-              avroElementPath, catalystPath :+ "element")
-            (updater, ordinal, value) =>
-              val collection = value.asInstanceOf[java.util.Collection[Any]]
-              val result = createArrayData(elementType, collection.size())
-              val elementUpdater = new ArrayDataUpdater(result)
-
-              var i = 0
-              val iter = collection.iterator()
-              while (iter.hasNext) {
-                val element = iter.next()
-                if (element == null) {
-                  if (!containsNull) {
-                    throw new RuntimeException(
-                      s"Array value at path ${toFieldStr(avroElementPath)}" +
-                        s" is not allowed to be null")
-                  } else {
-                    elementUpdater.setNullAt(i)
-                  }
-                } else {
-                  elementWriter(elementUpdater, i, element)
-                }
-                i += 1
+        updater.set(ordinal, bytes)
+
+      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+        val bigDecimal = 
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
+        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+        updater.setDecimal(ordinal, decimal)
+
+      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+        val bigDecimal = 
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+        updater.setDecimal(ordinal, decimal)
+
+      case (RECORD, st: StructType) =>
+        // Avro datasource doesn't accept filters with nested attributes. See 
SPARK-32328.
+        // We can always return `false` from `applyFilters` for nested records.
+        val writeRecord =
+          getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = 
_ => false)
+        (updater, ordinal, value) =>
+          val row = new SpecificInternalRow(st)
+          writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
+          updater.set(ordinal, row)
+
+      case (ARRAY, ArrayType(elementType, containsNull)) =>
+        val avroElementPath = avroPath :+ "element"
+        val elementWriter = newWriter(avroType.getElementType, elementType,
+          avroElementPath, catalystPath :+ "element")
+        (updater, ordinal, value) =>
+          val collection = value.asInstanceOf[java.util.Collection[Any]]
+          val result = createArrayData(elementType, collection.size())
+          val elementUpdater = new ArrayDataUpdater(result)
+
+          var i = 0
+          val iter = collection.iterator()
+          while (iter.hasNext) {
+            val element = iter.next()
+            if (element == null) {
+              if (!containsNull) {
+                throw new RuntimeException(
+                  s"Array value at path ${toFieldStr(avroElementPath)} is not 
allowed to be null")
+              } else {
+                elementUpdater.setNullAt(i)
               }
-              updater.set(ordinal, result)
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case MAP =>
-        (logicalDataType, catalystType) match {
-          case (_, MapType(keyType, valueType, valueContainsNull))
-            if keyType == StringType =>
-            val keyWriter = newWriter(SchemaBuilder.builder().stringType(), 
StringType,
-              avroPath :+ "key", catalystPath :+ "key")
-            val valueWriter = newWriter(avroType.getValueType, valueType,
-              avroPath :+ "value", catalystPath :+ "value")
-            (updater, ordinal, value) =>
-              val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
-              val keyArray = createArrayData(keyType, map.size())
-              val keyUpdater = new ArrayDataUpdater(keyArray)
-              val valueArray = createArrayData(valueType, map.size())
-              val valueUpdater = new ArrayDataUpdater(valueArray)
-              val iter = map.entrySet().iterator()
-              var i = 0
-              while (iter.hasNext) {
-                val entry = iter.next()
-                assert(entry.getKey != null)
-                keyWriter(keyUpdater, i, entry.getKey)
-                if (entry.getValue == null) {
-                  if (!valueContainsNull) {
-                    throw new RuntimeException(
-                      s"Map value at path ${toFieldStr(avroPath :+ "value")}" +
-                        s" is not allowed to be null")
-                  } else {
-                    valueUpdater.setNullAt(i)
-                  }
-                } else {
-                  valueWriter(valueUpdater, i, entry.getValue)
-                }
-                i += 1
+            } else {
+              elementWriter(elementUpdater, i, element)
+            }
+            i += 1
+          }
+
+          updater.set(ordinal, result)
+
+      case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == 
StringType =>
+        val keyWriter = newWriter(SchemaBuilder.builder().stringType(), 
StringType,
+          avroPath :+ "key", catalystPath :+ "key")
+        val valueWriter = newWriter(avroType.getValueType, valueType,
+          avroPath :+ "value", catalystPath :+ "value")
+        (updater, ordinal, value) =>
+          val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
+          val keyArray = createArrayData(keyType, map.size())
+          val keyUpdater = new ArrayDataUpdater(keyArray)
+          val valueArray = createArrayData(valueType, map.size())
+          val valueUpdater = new ArrayDataUpdater(valueArray)
+          val iter = map.entrySet().iterator()
+          var i = 0
+          while (iter.hasNext) {
+            val entry = iter.next()
+            assert(entry.getKey != null)
+            keyWriter(keyUpdater, i, entry.getKey)
+            if (entry.getValue == null) {
+              if (!valueContainsNull) {
+                throw new RuntimeException(
+                  s"Map value at path ${toFieldStr(avroPath :+ "value")} is 
not allowed to be null")
+              } else {
+                valueUpdater.setNullAt(i)
               }
-              // The Avro map will never have null or duplicated map keys, 
it's safe to create a
-              // ArrayBasedMapData directly here.
-              updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
-          case _ => throw new IncompatibleSchemaException(incompatibleMsg)
-        }
-      case UNION =>
+            } else {
+              valueWriter(valueUpdater, i, entry.getValue)
+            }
+            i += 1
+          }
+
+          // The Avro map will never have null or duplicated map keys, it's 
safe to create a
+          // ArrayBasedMapData directly here.
+          updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
+
+      case (UNION, _) =>
         val nonNullTypes = nonNullUnionBranches(avroType)
         val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
         if (nonNullTypes.nonEmpty) {
@@ -388,18 +297,20 @@ private[sql] class AvroDeserializer(
           } else {
             nonNullTypes.map(_.getType).toSeq match {
               case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == 
LongType =>
-                (updater, ordinal, value) => value match {
-                  case null => updater.setNullAt(ordinal)
-                  case l: java.lang.Long => updater.setLong(ordinal, l)
-                  case i: java.lang.Integer => updater.setLong(ordinal, 
i.longValue())
-                }
+                (updater, ordinal, value) =>
+                  value match {
+                    case null => updater.setNullAt(ordinal)
+                    case l: java.lang.Long => updater.setLong(ordinal, l)
+                    case i: java.lang.Integer => updater.setLong(ordinal, 
i.longValue())
+                  }
 
               case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && 
catalystType == DoubleType =>
-                (updater, ordinal, value) => value match {
-                  case null => updater.setNullAt(ordinal)
-                  case d: java.lang.Double => updater.setDouble(ordinal, d)
-                  case f: java.lang.Float => updater.setDouble(ordinal, 
f.doubleValue())
-                }
+                (updater, ordinal, value) =>
+                  value match {
+                    case null => updater.setNullAt(ordinal)
+                    case d: java.lang.Double => updater.setDouble(ordinal, d)
+                    case f: java.lang.Float => updater.setDouble(ordinal, 
f.doubleValue())
+                  }
 
               case _ =>
                 catalystType match {
@@ -423,6 +334,17 @@ private[sql] class AvroDeserializer(
         } else {
           (updater, ordinal, _) => updater.setNullAt(ordinal)
         }
+
+      case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, value.asInstanceOf[Int])
+
+      case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) =>
+        updater.setLong(ordinal, value.asInstanceOf[Long])
+
+      case (LONG, _: DecimalType) => (updater, ordinal, value) =>
+        val d = avroType.getLogicalType.asInstanceOf[CustomDecimal]
+        updater.setDecimal(ordinal, Decimal(value.asInstanceOf[Long], 
d.precision, d.scale))
+
       case _ => throw new IncompatibleSchemaException(incompatibleMsg)
     }
   }
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 516c173c118..d22a2d36975 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -32,7 +32,6 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
 import org.apache.avro.generic.{GenericData, GenericDatumReader, 
GenericDatumWriter, GenericRecord}
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, 
SparkUpgradeException}
 import org.apache.spark.TestUtils.assertExceptionMsg
@@ -815,166 +814,6 @@ abstract class AvroSuite
     }
   }
 
-  test("SPARK-43380: Fix Avro data type conversion" +
-      " of decimal type to avoid producing incorrect results") {
-    withTempPath { path =>
-      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
-      sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
-      // With the flag disabled, we will throw an exception if there is a 
mismatch
-      withSQLConf(confKey -> "false") {
-        val e = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(4, 
3)").format("avro").load(path.toString).collect()
-        }
-        ExceptionUtils.getRootCause(e) match {
-          case ex: AnalysisException =>
-            checkError(
-              exception = ex,
-              errorClass = "AVRO_LOWER_PRECISION",
-              parameters = Map("avroPath" -> "field 'a'",
-                "sqlPath" -> "field 'a'",
-                "avroType" -> "decimal\\(12,10\\)",
-                "sqlType" -> "\"DECIMAL\\(4,3\\)\"",
-                "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
-              matchPVals = true
-            )
-          case other =>
-            fail(s"Received unexpected exception", other)
-        }
-      }
-      // The following used to work, so it should still work with the flag 
enabled
-      checkAnswer(
-        spark.read.schema("a DECIMAL(5, 
3)").format("avro").load(path.toString),
-        Row(new java.math.BigDecimal("13.123"))
-      )
-      withSQLConf(confKey -> "true") {
-        // With the flag enabled, we return a null silently, which isn't great
-        checkAnswer(
-          spark.read.schema("a DECIMAL(4, 
3)").format("avro").load(path.toString),
-          Row(null)
-        )
-        checkAnswer(
-          spark.read.schema("a DECIMAL(5, 
3)").format("avro").load(path.toString),
-          Row(new java.math.BigDecimal("13.123"))
-        )
-      }
-    }
-  }
-
-  test("SPARK-43380: Fix Avro data type conversion" +
-    " of DayTimeIntervalType to avoid producing incorrect results") {
-    withTempPath { path =>
-      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
-      val schema = StructType(Array(StructField("a", DayTimeIntervalType(), 
false)))
-      val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
-
-      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
-      df.write.format("avro").save(path.getCanonicalPath)
-
-      withSQLConf(confKey -> "false") {
-        Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
-          val e = intercept[SparkException] {
-            spark.read.schema(s"a 
$sqlType").format("avro").load(path.toString).collect()
-          }
-
-          ExceptionUtils.getRootCause(e) match {
-            case ex: AnalysisException =>
-              checkError(
-                exception = ex,
-                errorClass = "AVRO_INCORRECT_TYPE",
-                parameters = Map("avroPath" -> "field 'a'",
-                  "sqlPath" -> "field 'a'",
-                  "avroType" -> "interval day to second",
-                  "sqlType" -> s""""$sqlType"""",
-                  "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
-                matchPVals = true
-              )
-            case other =>
-              fail(s"Received unexpected exception", other)
-          }
-        }
-      }
-
-      withSQLConf(confKey -> "true") {
-        // Allow conversion and do not need to check result
-        spark.read.schema("a Date").format("avro").load(path.toString)
-        spark.read.schema("a timestamp").format("avro").load(path.toString)
-        spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
-      }
-    }
-  }
-
-  test("SPARK-43380: Fix Avro data type conversion" +
-    " of YearMonthIntervalType to avoid producing incorrect results") {
-    withTempPath { path =>
-      val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
-      val schema = StructType(Array(StructField("a", YearMonthIntervalType(), 
false)))
-      val data = Seq(Row(java.time.Period.of(1, 1, 0)))
-
-      val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
-      df.write.format("avro").save(path.getCanonicalPath)
-
-      withSQLConf(confKey -> "false") {
-        Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
-          val e = intercept[SparkException] {
-            spark.read.schema(s"a 
$sqlType").format("avro").load(path.toString).collect()
-          }
-
-          ExceptionUtils.getRootCause(e) match {
-            case ex: AnalysisException =>
-              checkError(
-                exception = ex,
-                errorClass = "AVRO_INCORRECT_TYPE",
-                parameters = Map("avroPath" -> "field 'a'",
-                  "sqlPath" -> "field 'a'",
-                  "avroType" -> "interval year to month",
-                  "sqlType" -> s""""$sqlType"""",
-                  "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
-                matchPVals = true
-              )
-            case other =>
-              fail(s"Received unexpected exception", other)
-          }
-        }
-      }
-
-      withSQLConf(confKey -> "true") {
-        // Allow conversion and do not need to check result
-        spark.read.schema("a Date").format("avro").load(path.toString)
-        spark.read.schema("a timestamp").format("avro").load(path.toString)
-        spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
-      }
-    }
-  }
-
-  Seq(
-    "time-millis",
-    "time-micros",
-    "timestamp-micros",
-    "timestamp-millis",
-    "local-timestamp-millis",
-    "local-timestamp-micros"
-  ).foreach { timeLogicalType =>
-    test(s"converting $timeLogicalType type to long in avro") {
-      withTempPath { path =>
-        val df = Seq(100L)
-          .toDF("dt")
-        val avroSchema =
-          s"""
-             |{
-             |  "type" : "record",
-             |  "name" : "test_schema",
-             |  "fields" : [
-             |    {"name": "dt", "type": {"type": "long", "logicalType": 
"$timeLogicalType"}}
-             |  ]
-             |}""".stripMargin
-        df.write.format("avro").option("avroSchema", 
avroSchema).save(path.getCanonicalPath)
-        checkAnswer(
-          spark.read.schema(s"dt long").format("avro").load(path.toString),
-          Row(100L))
-      }
-    }
-  }
-
   test("converting some specific sparkSQL types to avro") {
     withTempPath { tempDir =>
       val testSchema = StructType(Seq(
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index b099560184e..da802e0ad6a 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -93,18 +93,6 @@ Invalid as-of join.
 
 For more details see 
[AS_OF_JOIN](sql-error-conditions-as-of-join-error-class.html)
 
-### AVRO_INCORRECT_TYPE
-
-SQLSTATE: none assigned
-
-Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original 
encoded data type is `<avroType>`, however you're trying to read the field as 
`<sqlType>`, which would lead to an incorrect answer. To allow reading this 
field, enable the SQL configuration: `<key>`.
-
-### AVRO_LOWER_PRECISION
-
-SQLSTATE: none assigned
-
-Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original 
encoded data type is `<avroType>`, however you're trying to read the field as 
`<sqlType>`, which leads to data being read as null. Please provide a wider 
decimal type to get the correct result. To allow reading null to this field, 
enable the SQL configuration: `<key>`.
-
 ### BATCH_METADATA_NOT_FOUND
 
 [SQLSTATE: 
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -2186,5 +2174,3 @@ The operation `<operation>` requires a `<requiredType>`. 
But `<objectName>` is a
 The `<functionName>` requires `<expectedNum>` parameters but the actual number 
is `<actualNum>`.
 
 For more details see 
[WRONG_NUM_ARGS](sql-error-conditions-wrong-num-args-error-class.html)
-
-
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 59b125cbc82..bc5f442220b 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -26,7 +26,6 @@ license: |
 
 - Since Spark 3.5, the JDBC options related to DS V2 pushdown are `true` by 
default. These options include: `pushDownAggregate`, `pushDownLimit`, 
`pushDownOffset` and `pushDownTableSample`. To restore the legacy behavior, 
please set them to `false`. e.g. set 
`spark.sql.catalog.your_catalog_name.pushDownAggregate` to `false`.
 - Since Spark 3.5, Spark thrift server will interrupt task when canceling a 
running statement. To restore the previous behavior, set 
`spark.sql.thriftServer.interruptOnCancel` to `false`.
-- Since Spark 3.5, the Avro will throw `AnalysisException` when reading 
Interval types as Date or Timestamp types, or reading Decimal types with lower 
precision. To restore the legacy behavior, set 
`spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
 - Since Spark 3.5, Row's json and prettyJson methods are moved to `ToJsonUtil`.
 - Since Spark 3.5, the `plan` field is moved from `AnalysisException` to 
`EnhancedAnalysisException`.
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 2ef9c15a7f9..52ccba8541d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3720,36 +3720,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
     )
   }
 
-  def avroIncorrectTypeError(
-      avroPath: String, sqlPath: String, avroType: String,
-      sqlType: String, key: String): Throwable = {
-    new AnalysisException(
-      errorClass = "AVRO_INCORRECT_TYPE",
-      messageParameters = Map(
-        "avroPath" -> avroPath,
-        "sqlPath" -> sqlPath,
-        "avroType" -> avroType,
-        "sqlType" -> toSQLType(sqlType),
-        "key" -> key
-      )
-    )
-  }
-
-  def avroLowerPrecisionError(
-      avroPath: String, sqlPath: String, avroType: String,
-      sqlType: String, key: String): Throwable = {
-    new AnalysisException(
-      errorClass = "AVRO_LOWER_PRECISION",
-      messageParameters = Map(
-        "avroPath" -> avroPath,
-        "sqlPath" -> sqlPath,
-        "avroType" -> avroType,
-        "sqlType" -> toSQLType(sqlType),
-        "key" -> key
-      )
-    )
-  }
-
   def optionMustBeLiteralString(key: String): Throwable = {
     new AnalysisException(
       errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ced3f3458c0..ac1b4918d97 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4288,18 +4288,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
-  val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
-    buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
-      .internal()
-      .doc("When set to false, if types in Avro are encoded in the same 
format, but " +
-        "the type in the Avro schema explicitly says that the data types are 
different, " +
-        "reject reading the data type in the format to avoid returning 
incorrect results. " +
-        "When set to true, it restores the legacy behavior of allow reading 
the data in the" +
-        " format, which may return incorrect results.")
-      .version("3.5.0")
-      .booleanConf
-      .createWithDefault(false)
-
   val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
     buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to