This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new afe2247  [SPARK-31405][SQL][3.0] Fail by default when reading/writing 
legacy datetime values from/to Parquet/Avro files
afe2247 is described below

commit afe2247cffda4dc46c41e3db9d1dc9853beadd28
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Sun May 17 02:32:39 2020 +0000

    [SPARK-31405][SQL][3.0] Fail by default when reading/writing legacy 
datetime values from/to Parquet/Avro files
    
    ### What changes were proposed in this pull request?
    
    When reading/writing datetime values that before the rebase switch day, 
from/to Avro/Parquet files, fail by default and ask users to set a config to 
explicitly do rebase or not.
    
    ### Why are the changes needed?
    
    Rebase or not rebase have different behaviors and we should let users 
decide it explicitly. In most cases, users won't hit this exception as it only 
affects ancient datetime values.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now users will see an error when reading/writing dates before 
1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an 
error message to ask setting a config.
    
    ### How was this patch tested?
    
    updated tests
    
    Closes #28526 from cloud-fan/backport.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/SparkException.scala    |   2 +-
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  37 ++++----
 .../org/apache/spark/sql/avro/AvroFileFormat.scala |  10 +-
 .../apache/spark/sql/avro/AvroOutputWriter.scala   |  13 ++-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  30 +++---
 .../sql/v2/avro/AvroPartitionReaderFactory.scala   |   9 +-
 .../sql/avro/AvroCatalystDataConversionSuite.scala |   2 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala      |  91 ++++++++++++------
 .../spark/sql/catalyst/json/JacksonParser.scala    |   1 +
 .../spark/sql/catalyst/util/RebaseDateTime.scala   |   4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 100 ++++++++++----------
 .../parquet/VectorizedColumnReader.java            |  91 ++++++++++++------
 .../parquet/VectorizedParquetRecordReader.java     |  12 +--
 .../parquet/VectorizedPlainValuesReader.java       |  29 ++++--
 .../parquet/VectorizedRleValuesReader.java         |  31 ++++---
 .../parquet/VectorizedValuesReader.java            |   4 +-
 .../execution/datasources/DataSourceUtils.scala    | 103 ++++++++++++++++++++-
 .../sql/execution/datasources/FileScanRDD.scala    |   6 +-
 .../datasources/parquet/ParquetFileFormat.scala    |  11 +--
 .../datasources/parquet/ParquetReadSupport.scala   |   7 +-
 .../parquet/ParquetRecordMaterializer.scala        |   9 +-
 .../datasources/parquet/ParquetRowConverter.scala  |  68 ++++++--------
 .../datasources/parquet/ParquetWriteSupport.scala  |  46 ++++-----
 .../v2/parquet/ParquetPartitionReaderFactory.scala |  23 ++---
 .../benchmark/DateTimeRebaseBenchmark.scala        |   4 +-
 .../datasources/parquet/ParquetIOSuite.scala       |  89 +++++++++++-------
 .../spark/sql/sources/HadoopFsRelationTest.scala   |   6 +-
 27 files changed, 519 insertions(+), 319 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala 
b/core/src/main/scala/org/apache/spark/SparkException.scala
index 81c087e..4138213 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: 
String)
  * Exception thrown when Spark returns different result after upgrading to a 
new version.
  */
 private[spark] class SparkUpgradeException(version: String, message: String, 
cause: Throwable)
-  extends SparkException("You may get a different result due to the upgrading 
of Spark" +
+  extends RuntimeException("You may get a different result due to the 
upgrading of Spark" +
     s" $version: $message", cause)
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 27206ed..4fc8040 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 /**
  * A deserializer to deserialize data in avro format to data in catalyst 
format.
  */
-class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, 
rebaseDateTime: Boolean) {
+class AvroDeserializer(
+    rootAvroType: Schema,
+    rootCatalystType: DataType,
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
 
   def this(rootAvroType: Schema, rootCatalystType: DataType) {
     this(rootAvroType, rootCatalystType,
-      SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
+      LegacyBehaviorPolicy.withName(
+        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
   }
 
   private lazy val decimalConversions = new DecimalConversion()
 
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+    datetimeRebaseMode, "Avro")
+
+  private val timestampRebaseFunc = 
DataSourceUtils.creteTimestampRebaseFuncInRead(
+    datetimeRebaseMode, "Avro")
+
   private val converter: Any => Any = rootCatalystType match {
     // A shortcut for empty schema.
     case st: StructType if st.isEmpty =>
@@ -96,13 +107,8 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType, rebaseD
       case (INT, IntegerType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[Int])
 
-      case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
-        val days = value.asInstanceOf[Int]
-        val rebasedDays = rebaseJulianToGregorianDays(days)
-        updater.setInt(ordinal, rebasedDays)
-
       case (INT, DateType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, value.asInstanceOf[Int])
+        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
 
       case (LONG, LongType) => (updater, ordinal, value) =>
         updater.setLong(ordinal, value.asInstanceOf[Long])
@@ -110,22 +116,13 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType, rebaseD
       case (LONG, TimestampType) => avroType.getLogicalType match {
         // For backward compatibility, if the Avro type is Long and it is not 
logical type
         // (the `null` case), the value is processed as timestamp type with 
millisecond precision.
-        case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, 
value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.fromMillis(millis)
-          val rebasedMicros = rebaseJulianToGregorianMicros(micros)
-          updater.setLong(ordinal, rebasedMicros)
         case null | _: TimestampMillis => (updater, ordinal, value) =>
           val millis = value.asInstanceOf[Long]
           val micros = DateTimeUtils.fromMillis(millis)
-          updater.setLong(ordinal, micros)
-        case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) 
=>
-          val micros = value.asInstanceOf[Long]
-          val rebasedMicros = rebaseJulianToGregorianMicros(micros)
-          updater.setLong(ordinal, rebasedMicros)
+          updater.setLong(ordinal, timestampRebaseFunc(micros))
         case _: TimestampMicros => (updater, ordinal, value) =>
           val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, micros)
+          updater.setLong(ordinal, timestampRebaseFunc(micros))
         case other => throw new IncompatibleSchemaException(
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp 
type.")
       }
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index e69c95b..59d54bc 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
         reader.sync(file.start)
         val stop = file.start + file.length
 
-        val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
-          reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
-          SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
-        }
+        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+          reader.asInstanceOf[DataFileReader[_]].getMetaString,
+          SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
+
         val deserializer = new AvroDeserializer(
-          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, 
rebaseDateTime)
+          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, 
datetimeRebaseMode)
 
         new Iterator[InternalRow] {
           private[this] var completed = false
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 82a5680..ac9608c 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.OutputWriter
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 // NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
@@ -43,12 +44,12 @@ private[avro] class AvroOutputWriter(
     avroSchema: Schema) extends OutputWriter {
 
   // Whether to rebase datetimes from Gregorian to Julian calendar in write
-  private val rebaseDateTime: Boolean =
-    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
+  private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
+    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))
 
   // The input rows will never be null.
   private lazy val serializer =
-    new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
+    new AvroSerializer(schema, avroSchema, nullable = false, 
datetimeRebaseMode)
 
   /**
    * Overrides the couple of methods responsible for generating the output 
streams / files so
@@ -56,7 +57,11 @@ private[avro] class AvroOutputWriter(
    */
   private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] 
= {
     val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
-      if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
+      if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
+        Some(SPARK_LEGACY_DATETIME -> "")
+      } else {
+        None
+      }
     }
 
     new SparkAvroKeyOutputFormat(fileMeta.asJava) {
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index dc23216..d6cfbc5 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, 
SpecificInternalRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 /**
@@ -46,17 +47,24 @@ class AvroSerializer(
     rootCatalystType: DataType,
     rootAvroType: Schema,
     nullable: Boolean,
-    rebaseDateTime: Boolean) extends Logging {
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
 
   def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: 
Boolean) {
     this(rootCatalystType, rootAvroType, nullable,
-      SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
+      LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
+        SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
   }
 
   def serialize(catalystData: Any): Any = {
     converter.apply(catalystData)
   }
 
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
+    datetimeRebaseMode, "Avro")
+
+  private val timestampRebaseFunc = 
DataSourceUtils.creteTimestampRebaseFuncInWrite(
+    datetimeRebaseMode, "Avro")
+
   private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
     val baseConverter = rootCatalystType match {
@@ -146,24 +154,16 @@ class AvroSerializer(
       case (BinaryType, BYTES) =>
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
-      case (DateType, INT) if rebaseDateTime =>
-        (getter, ordinal) => 
rebaseGregorianToJulianDays(getter.getInt(ordinal))
-
       case (DateType, INT) =>
-        (getter, ordinal) => getter.getInt(ordinal)
+        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
 
       case (TimestampType, LONG) => avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is 
not logical type
           // (the `null` case), output the timestamp value as with millisecond 
precision.
-          case null | _: TimestampMillis if rebaseDateTime => (getter, 
ordinal) =>
-            val micros = getter.getLong(ordinal)
-            val rebasedMicros = rebaseGregorianToJulianMicros(micros)
-            DateTimeUtils.toMillis(rebasedMicros)
           case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.toMillis(getter.getLong(ordinal))
-          case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
-            rebaseGregorianToJulianMicros(getter.getLong(ordinal))
-          case _: TimestampMicros => (getter, ordinal) => 
getter.getLong(ordinal)
+            
DateTimeUtils.toMillis(timestampRebaseFunc(getter.getLong(ordinal)))
+          case _: TimestampMicros => (getter, ordinal) =>
+            timestampRebaseFunc(getter.getLong(ordinal))
           case other => throw new IncompatibleSchemaException(
             s"Cannot convert Catalyst Timestamp type to Avro logical type 
${other}")
         }
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
 
b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 712aec6..15918f4 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
       reader.sync(partitionedFile.start)
       val stop = partitionedFile.start + partitionedFile.length
 
-      val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
-        reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
-        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
-      }
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        reader.asInstanceOf[DataFileReader[_]].getMetaString,
+        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
       val deserializer = new AvroDeserializer(
-        userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, 
rebaseDateTime)
+        userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, 
datetimeRebaseMode)
 
       val fileReader = new PartitionReader[InternalRow] {
         private[this] var completed = false
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index 64d790b..c8a1f67 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
       """.stripMargin
     val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
     val dataType = SchemaConverters.toSqlType(avroSchema).dataType
-    val deserializer = new AvroDeserializer(avroSchema, dataType, 
rebaseDateTime = false)
+    val deserializer = new AvroDeserializer(avroSchema, dataType)
 
     def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
       assert(checkResult(
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 3e754f0..a5c1fb1 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, 
GenericDatumReader, GenericDatumWri
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, 
SparkUpgradeException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.TestingUDT.IntervalData
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.v2.avro.AvroScan
@@ -1538,13 +1539,28 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
         val path3_0_rebase = paths(1).getCanonicalPath
         if (dt == "date") {
           val df = 
Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
-          df.write.format("avro").save(path3_0)
-          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> 
"true") {
+
+          // By default we should fail to write ancient datetime values.
+          var e = 
intercept[SparkException](df.write.format("avro").save(path3_0))
+          
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+          // By default we should fail to read ancient datetime values.
+          e = 
intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
+          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString) {
+            df.write.format("avro").mode("overwrite").save(path3_0)
+          }
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
             df.write.format("avro").save(path3_0_rebase)
           }
-          checkAnswer(
-            spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+
+          // For Avro files written by Spark 3.0, we know the writer info and 
don't need the config
+          // to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+          }
         } else {
           val df = 
Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
           val avroSchema =
@@ -1556,24 +1572,39 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
               |    {"name": "ts", "type": {"type": "long", "logicalType": 
"$dt"}}
               |  ]
               |}""".stripMargin
-          df.write.format("avro").option("avroSchema", 
avroSchema).save(path3_0)
-          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> 
"true") {
+
+          // By default we should fail to write ancient datetime values.
+          var e = intercept[SparkException] {
+            df.write.format("avro").option("avroSchema", 
avroSchema).save(path3_0)
+          }
+          
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+          // By default we should fail to read ancient datetime values.
+          e = 
intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
+          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString) {
+            df.write.format("avro").option("avroSchema", 
avroSchema).mode("overwrite").save(path3_0)
+          }
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
             df.write.format("avro").option("avroSchema", 
avroSchema).save(path3_0_rebase)
           }
-          checkAnswer(
-            spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+
+          // For Avro files written by Spark 3.0, we know the writer info and 
don't need the config
+          // to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+          }
         }
       }
     }
 
-    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
-      checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
-      checkReadMixedFiles(
-        "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 
01:02:03.123456")
-      checkReadMixedFiles(
-        "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 
01:02:03.124")
-    }
+    checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
+    checkReadMixedFiles(
+      "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 
01:02:03.123456")
+    checkReadMixedFiles(
+      "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 
01:02:03.124")
   }
 
   test("SPARK-31183: rebasing microseconds timestamps in write") {
@@ -1581,7 +1612,7 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
     val nonRebased = "1001-01-07 01:09:05.123456"
     withTempPath { dir =>
       val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
         Seq(tsStr).toDF("tsS")
           .select($"tsS".cast("timestamp").as("ts"))
           .write.format("avro")
@@ -1589,9 +1620,9 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
       }
 
       // The file metadata indicates if it needs rebase or not, so we can 
always get the correct
-      // result regardless of the "rebaseInRead" config.
-      Seq(true, false).foreach { rebase =>
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> 
rebase.toString) {
+      // result regardless of the "rebase mode" config.
+      Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
mode.toString) {
           checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(tsStr)))
         }
       }
@@ -1622,7 +1653,7 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
         |}""".stripMargin
       withTempPath { dir =>
         val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> 
"true") {
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
           Seq(tsStr).toDF("tsS")
             .select($"tsS".cast("timestamp").as("ts"))
             .write
@@ -1632,9 +1663,9 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
         }
 
         // The file metadata indicates if it needs rebase or not, so we can 
always get the correct
-        // result regardless of the "rebaseInRead" config.
-        Seq(true, false).foreach { rebase =>
-          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> 
rebase.toString) {
+        // result regardless of the "rebase mode" config.
+        Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
mode.toString) {
             checkAnswer(
               spark.read.schema("ts timestamp").format("avro").load(path),
               Row(Timestamp.valueOf(rebased)))
@@ -1655,7 +1686,7 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
   test("SPARK-31183: rebasing dates in write") {
     withTempPath { dir =>
       val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
         Seq("1001-01-01").toDF("dateS")
           .select($"dateS".cast("date").as("date"))
           .write.format("avro")
@@ -1663,9 +1694,9 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
       }
 
       // The file metadata indicates if it needs rebase or not, so we can 
always get the correct
-      // result regardless of the "rebaseInRead" config.
-      Seq(true, false).foreach { rebase =>
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> 
rebase.toString) {
+      // result regardless of the "rebase mode" config.
+      Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
mode.toString) {
           checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-01")))
         }
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index a52c345..ef98793 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -456,6 +456,7 @@ class JacksonParser(
         }
       }
     } catch {
+      case e: SparkUpgradeException => throw e
       case e @ (_: RuntimeException | _: JsonProcessingException | _: 
MalformedInputException) =>
         // JSON parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
index eb67ff7..e29fa4b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
@@ -146,6 +146,8 @@ object RebaseDateTime {
     -354226, -317702, -244653, -208129, -171605, -141436, -141435, -141434,
     -141433, -141432, -141431, -141430, -141429, -141428, -141427)
 
+  final val lastSwitchGregorianDay: Int = gregJulianDiffSwitchDay.last
+
   // The first days of Common Era (CE) which is mapped to the '0001-01-01' date
   // in Proleptic Gregorian calendar.
   private final val gregorianCommonEraStartDay = gregJulianDiffSwitchDay(0)
@@ -295,7 +297,7 @@ object RebaseDateTime {
   }
   // The switch time point after which all diffs between Gregorian and Julian 
calendars
   // across all time zones are zero
-  private final val lastSwitchGregorianTs: Long = 
getLastSwitchTs(gregJulianRebaseMap)
+  final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap)
 
   private final val gregorianStartTs = LocalDateTime.of(gregorianStartDate, 
LocalTime.MIDNIGHT)
   private final val julianEndTs = LocalDateTime.of(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b073f7e..d0b55ce 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2500,57 +2500,63 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
-  val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
-    buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
-      .internal()
-      .doc("When true, rebase dates/timestamps from Proleptic Gregorian 
calendar " +
-        "to the hybrid calendar (Julian + Gregorian) in write. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
-        "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+  val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
+    buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
+      .internal()
+      .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic 
Gregorian calendar " +
+        "to the legacy hybrid (Julian + Gregorian) calendar when writing 
Parquet files. " +
+        "When CORRECTED, Spark will not do rebase and write the 
dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the writing if 
it sees " +
+        "ancient dates/timestamps that are ambiguous between the two 
calendars.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
-
-  val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
-    buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
-      .internal()
-      .doc("When true, rebase dates/timestamps " +
-        "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
-        "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+  val LEGACY_PARQUET_REBASE_MODE_IN_READ =
+    buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
+      .internal()
+      .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy 
hybrid (Julian + " +
+        "Gregorian) calendar to Proleptic Gregorian calendar when reading 
Parquet files. " +
+        "When CORRECTED, Spark will not do rebase and read the 
dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the reading if 
it sees " +
+        "ancient dates/timestamps that are ambiguous between the two 
calendars. This config is " +
+        "only effective if the writer info (like Spark, Hive) of the Parquet 
files is unknown.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
-  val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
-    buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
+  val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
+    buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
       .internal()
-      .doc("When true, rebase dates/timestamps from Proleptic Gregorian 
calendar " +
-        "to the hybrid calendar (Julian + Gregorian) in write. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
-        "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+      .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic 
Gregorian calendar " +
+        "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro 
files. " +
+        "When CORRECTED, Spark will not do rebase and write the 
dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the writing if 
it sees " +
+        "ancient dates/timestamps that are ambiguous between the two 
calendars.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
-
-  val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
-    buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
-      .internal()
-      .doc("When true, rebase dates/timestamps " +
-        "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the 
resulted date/" +
-        "timestamp in the target calendar, and getting the number of 
micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+  val LEGACY_AVRO_REBASE_MODE_IN_READ =
+    buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
+      .internal()
+      .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy 
hybrid (Julian + " +
+        "Gregorian) calendar to Proleptic Gregorian calendar when reading Avro 
files. " +
+        "When CORRECTED, Spark will not do rebase and read the 
dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the reading if 
it sees " +
+        "ancient dates/timestamps that are ambiguous between the two 
calendars. This config is " +
+        "only effective if the writer info (like Spark, Hive) of the Avro 
files is unknown.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
   val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
     buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
@@ -3136,10 +3142,6 @@ class SQLConf extends Serializable with Logging {
 
   def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
 
-  def parquetRebaseDateTimeInReadEnabled: Boolean = {
-    getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
-  }
-
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 11ce11d..f264281 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -36,6 +36,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataTypes;
@@ -102,14 +103,14 @@ public class VectorizedColumnReader {
   // The timezone conversion to apply to int96 timestamps. Null if no 
conversion.
   private final ZoneId convertTz;
   private static final ZoneId UTC = ZoneOffset.UTC;
-  private final boolean rebaseDateTime;
+  private final String datetimeRebaseMode;
 
   public VectorizedColumnReader(
       ColumnDescriptor descriptor,
       OriginalType originalType,
       PageReader pageReader,
       ZoneId convertTz,
-      boolean rebaseDateTime) throws IOException {
+      String datetimeRebaseMode) throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
     this.convertTz = convertTz;
@@ -132,7 +133,9 @@ public class VectorizedColumnReader {
     if (totalValueCount == 0) {
       throw new IOException("totalValueCount == 0");
     }
-    this.rebaseDateTime = rebaseDateTime;
+    assert "LEGACY".equals(datetimeRebaseMode) || 
"EXCEPTION".equals(datetimeRebaseMode) ||
+      "CORRECTED".equals(datetimeRebaseMode);
+    this.datetimeRebaseMode = datetimeRebaseMode;
   }
 
   /**
@@ -156,11 +159,11 @@ public class VectorizedColumnReader {
     boolean isSupported = false;
     switch (typeName) {
       case INT32:
-        isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
+        isSupported = originalType != OriginalType.DATE || 
"CORRECTED".equals(datetimeRebaseMode);
         break;
       case INT64:
         if (originalType == OriginalType.TIMESTAMP_MICROS) {
-          isSupported = !rebaseDateTime;
+          isSupported = "CORRECTED".equals(datetimeRebaseMode);
         } else {
           isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
         }
@@ -174,6 +177,30 @@ public class VectorizedColumnReader {
     return isSupported;
   }
 
+  static int rebaseDays(int julianDays, final boolean failIfRebase) {
+    if (failIfRebase) {
+      if (julianDays < RebaseDateTime.lastSwitchJulianDay()) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        return julianDays;
+      }
+    } else {
+      return RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
+    }
+  }
+
+  static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
+    if (failIfRebase) {
+      if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        return julianMicros;
+      }
+    } else {
+      return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+    }
+  }
+
   /**
    * Reads `total` values from this columnReader into column.
    */
@@ -283,7 +310,7 @@ public class VectorizedColumnReader {
       case INT32:
         if (column.dataType() == DataTypes.IntegerType ||
             DecimalType.is32BitDecimalType(column.dataType()) ||
-            (column.dataType() == DataTypes.DateType && !rebaseDateTime)) {
+            (column.dataType() == DataTypes.DateType && 
"CORRECTED".equals(datetimeRebaseMode))) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putInt(i, 
dictionary.decodeToInt(dictionaryIds.getDictId(i)));
@@ -302,11 +329,11 @@ public class VectorizedColumnReader {
             }
           }
         } else if (column.dataType() == DataTypes.DateType) {
+          final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               int julianDays = 
dictionary.decodeToInt(dictionaryIds.getDictId(i));
-              int gregorianDays = 
RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
-              column.putInt(i, gregorianDays);
+              column.putInt(i, rebaseDays(julianDays, failIfRebase));
             }
           }
         } else {
@@ -317,36 +344,37 @@ public class VectorizedColumnReader {
       case INT64:
         if (column.dataType() == DataTypes.LongType ||
             DecimalType.is64BitDecimalType(column.dataType()) ||
-            (originalType == OriginalType.TIMESTAMP_MICROS && 
!rebaseDateTime)) {
+            (originalType == OriginalType.TIMESTAMP_MICROS &&
+              "CORRECTED".equals(datetimeRebaseMode))) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putLong(i, 
dictionary.decodeToLong(dictionaryIds.getDictId(i)));
             }
           }
         } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
-          if (rebaseDateTime) {
+          if ("CORRECTED".equals(datetimeRebaseMode)) {
             for (int i = rowId; i < rowId + num; ++i) {
               if (!column.isNullAt(i)) {
-                long julianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
-                long julianMicros = DateTimeUtils.fromMillis(julianMillis);
-                long gregorianMicros = 
RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
-                column.putLong(i, gregorianMicros);
+                long gregorianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
+                column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
               }
             }
           } else {
+            final boolean failIfRebase = 
"EXCEPTION".equals(datetimeRebaseMode);
             for (int i = rowId; i < rowId + num; ++i) {
               if (!column.isNullAt(i)) {
-                long gregorianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
-                column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
+                long julianMillis = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
+                long julianMicros = DateTimeUtils.fromMillis(julianMillis);
+                column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
               }
             }
           }
         } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
+          final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               long julianMicros = 
dictionary.decodeToLong(dictionaryIds.getDictId(i));
-              long gregorianMicros = 
RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
-              column.putLong(i, gregorianMicros);
+              column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
             }
           }
         } else {
@@ -466,12 +494,13 @@ public class VectorizedColumnReader {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
     } else if (column.dataType() == DataTypes.DateType ) {
-      if (rebaseDateTime) {
-        defColumn.readIntegersWithRebase(
+      if ("CORRECTED".equals(datetimeRebaseMode)) {
+        defColumn.readIntegers(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
       } else {
-        defColumn.readIntegers(
-           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+        boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
+        defColumn.readIntegersWithRebase(
+          num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn, failIfRebase);
       }
     } else {
       throw constructConvertNotSupportedException(descriptor, column);
@@ -485,27 +514,29 @@ public class VectorizedColumnReader {
       defColumn.readLongs(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
-      if (rebaseDateTime) {
-        defColumn.readLongsWithRebase(
-          num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else {
+      if ("CORRECTED".equals(datetimeRebaseMode)) {
         defColumn.readLongs(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else {
+        boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
+        defColumn.readLongsWithRebase(
+          num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn, failIfRebase);
       }
     } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
-      if (rebaseDateTime) {
+      if ("CORRECTED".equals(datetimeRebaseMode)) {
         for (int i = 0; i < num; i++) {
           if (defColumn.readInteger() == maxDefLevel) {
-            long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
-            column.putLong(rowId + i, 
RebaseDateTime.rebaseJulianToGregorianMicros(micros));
+            column.putLong(rowId + i, 
DateTimeUtils.fromMillis(dataColumn.readLong()));
           } else {
             column.putNull(rowId + i);
           }
         }
       } else {
+        final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
         for (int i = 0; i < num; i++) {
           if (defColumn.readInteger() == maxDefLevel) {
-            column.putLong(rowId + i, 
DateTimeUtils.fromMillis(dataColumn.readLong()));
+            long julianMicros = 
DateTimeUtils.fromMillis(dataColumn.readLong());
+            column.putLong(rowId + i, rebaseMicros(julianMicros, 
failIfRebase));
           } else {
             column.putNull(rowId + i);
           }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index c9590b9..b40cc15 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -89,9 +89,9 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   private final ZoneId convertTz;
 
   /**
-   * true if need to rebase date/timestamp from Julian to Proleptic Gregorian 
calendar.
+   * The mode of rebasing date/timestamp from Julian to Proleptic Gregorian 
calendar.
    */
-  private final boolean rebaseDateTime;
+  private final String datetimeRebaseMode;
 
   /**
    * columnBatch object that is used for batch decoding. This is created on 
first use and triggers
@@ -122,16 +122,16 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   private final MemoryMode MEMORY_MODE;
 
   public VectorizedParquetRecordReader(
-    ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int 
capacity) {
+    ZoneId convertTz, String datetimeRebaseMode, boolean useOffHeap, int 
capacity) {
     this.convertTz = convertTz;
-    this.rebaseDateTime = rebaseDateTime;
+    this.datetimeRebaseMode = datetimeRebaseMode;
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
     this.capacity = capacity;
   }
 
   // For test only.
   public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
-    this(null, false, useOffHeap, capacity);
+    this(null, "CORRECTED", useOffHeap, capacity);
   }
 
   /**
@@ -321,7 +321,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i), 
types.get(i).getOriginalType(),
-        pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime);
+        pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 2ed2e11..eddbf39 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -21,13 +21,14 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.ParquetDecodingException;
+
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.api.Binary;
-
 /**
  * An implementation of the Parquet PLAIN decoder that supports the vectorized 
interface.
  */
@@ -86,7 +87,8 @@ public class VectorizedPlainValuesReader extends ValuesReader 
implements Vectori
   // iterates the values twice: check if we need to rebase first, then go to 
the optimized branch
   // if rebase is not needed.
   @Override
-  public final void readIntegersWithRebase(int total, WritableColumnVector c, 
int rowId) {
+  public final void readIntegersWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     int requiredBytes = total * 4;
     ByteBuffer buffer = getBuffer(requiredBytes);
     boolean rebase = false;
@@ -94,8 +96,12 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
       rebase |= buffer.getInt(buffer.position() + i * 4) < 
RebaseDateTime.lastSwitchJulianDay();
     }
     if (rebase) {
-      for (int i = 0; i < total; i += 1) {
-        c.putInt(rowId + i, 
RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
+      if (failIfRebase) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        for (int i = 0; i < total; i += 1) {
+          c.putInt(rowId + i, 
RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
+        }
       }
     } else {
       if (buffer.hasArray()) {
@@ -128,7 +134,8 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   // iterates the values twice: check if we need to rebase first, then go to 
the optimized branch
   // if rebase is not needed.
   @Override
-  public final void readLongsWithRebase(int total, WritableColumnVector c, int 
rowId) {
+  public final void readLongsWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     int requiredBytes = total * 8;
     ByteBuffer buffer = getBuffer(requiredBytes);
     boolean rebase = false;
@@ -136,8 +143,12 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
       rebase |= buffer.getLong(buffer.position() + i * 8) < 
RebaseDateTime.lastSwitchJulianTs();
     }
     if (rebase) {
-      for (int i = 0; i < total; i += 1) {
-        c.putLong(rowId + i, 
RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
+      if (failIfRebase) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        for (int i = 0; i < total; i += 1) {
+          c.putLong(rowId + i, 
RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
+        }
       }
     } else {
       if (buffer.hasArray()) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 4d72a33..24347a4e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
@@ -26,12 +29,8 @@ import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
-import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 /**
  * A values reader for Parquet's run-length encoded data. This is based off of 
the version in
  * parquet-mr with these changes:
@@ -211,7 +210,8 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) throws IOException {
+      VectorizedValuesReader data,
+      final boolean failIfRebase) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -219,7 +219,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       switch (mode) {
         case RLE:
           if (currentValue == level) {
-            data.readIntegersWithRebase(n, c, rowId);
+            data.readIntegersWithRebase(n, c, rowId, failIfRebase);
           } else {
             c.putNulls(rowId, n);
           }
@@ -227,8 +227,8 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
-              c.putInt(rowId + i,
-                
RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger()));
+              int julianDays = data.readInteger();
+              c.putInt(rowId + i, 
VectorizedColumnReader.rebaseDays(julianDays, failIfRebase));
             } else {
               c.putNull(rowId + i);
             }
@@ -387,7 +387,8 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) throws IOException {
+      VectorizedValuesReader data,
+      final boolean failIfRebase) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -395,7 +396,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       switch (mode) {
         case RLE:
           if (currentValue == level) {
-            data.readLongsWithRebase(n, c, rowId);
+            data.readLongsWithRebase(n, c, rowId, failIfRebase);
           } else {
             c.putNulls(rowId, n);
           }
@@ -403,8 +404,8 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
-              c.putLong(rowId + i,
-                RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong()));
+              long julianMicros = data.readLong();
+              c.putLong(rowId + i, 
VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase));
             } else {
               c.putNull(rowId + i);
             }
@@ -584,7 +585,8 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   @Override
-  public void readIntegersWithRebase(int total, WritableColumnVector c, int 
rowId) {
+  public void readIntegersWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
@@ -604,7 +606,8 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   @Override
-  public void readLongsWithRebase(int total, WritableColumnVector c, int 
rowId) {
+  public void readLongsWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 809ac44..35db8f2 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -40,9 +40,9 @@ public interface VectorizedValuesReader {
   void readBooleans(int total, WritableColumnVector c, int rowId);
   void readBytes(int total, WritableColumnVector c, int rowId);
   void readIntegers(int total, WritableColumnVector c, int rowId);
-  void readIntegersWithRebase(int total, WritableColumnVector c, int rowId);
+  void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, 
boolean failIfRebase);
   void readLongs(int total, WritableColumnVector c, int rowId);
-  void readLongsWithRebase(int total, WritableColumnVector c, int rowId);
+  void readLongsWithRebase(int total, WritableColumnVector c, int rowId, 
boolean failIfRebase);
   void readFloats(int total, WritableColumnVector c, int rowId);
   void readDoubles(int total, WritableColumnVector c, int rowId);
   void readBinary(int total, WritableColumnVector c, int rowId);
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 45a9b1a..abb74d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -23,9 +23,12 @@ import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.RebaseDateTime
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -84,17 +87,107 @@ object DataSourceUtils {
       case _ => false
     }
 
-  def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+  def datetimeRebaseMode(
+      lookupFileMeta: String => String,
+      modeByConfig: String): LegacyBehaviorPolicy.Value = {
     if (Utils.isTesting && 
SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
-      return Some(false)
+      return LegacyBehaviorPolicy.CORRECTED
     }
-    // If there is no version, we return None and let the caller side to 
decide.
+    // If there is no version, we return the mode specified by the config.
     Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
       // Files written by Spark 2.4 and earlier follow the legacy hybrid 
calendar and we need to
       // rebase the datetime values.
       // Files written by Spark 3.0 and latter may also need the rebase if 
they were written with
-      // the "rebaseInWrite" config enabled.
-      version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
+      // the "LEGACY" rebase mode.
+      if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+
+  def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
+    val config = if (format == "Parquet") {
+      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key
+    } else if (format == "Avro") {
+      SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key
+    } else {
+      throw new IllegalStateException("unrecognized format " + format)
     }
+    new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or 
timestamps before " +
+      s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files 
may be written by " +
+      "Spark 2.x or legacy versions of Hive, which uses a legacy hybrid 
calendar that is " +
+      "different from Spark 3.0+'s Proleptic Gregorian calendar. See more 
details in " +
+      s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime 
values w.r.t. " +
+      s"the calendar difference during reading. Or set $config to 'CORRECTED' 
to read the " +
+      "datetime values as it is.", null)
+  }
+
+  def newRebaseExceptionInWrite(format: String): SparkUpgradeException = {
+    val config = if (format == "Parquet") {
+      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key
+    } else if (format == "Avro") {
+      SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key
+    } else {
+      throw new IllegalStateException("unrecognized format " + format)
+    }
+    new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or 
timestamps before " +
+      s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files 
may be read by " +
+      "Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid 
calendar that is " +
+      "different from Spark 3.0+'s Proleptic Gregorian calendar. See more 
details in " +
+      s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime 
values w.r.t. " +
+      "the calendar difference during writing, to get maximum 
interoperability. Or set " +
+      s"$config to 'CORRECTED' to write the datetime values as it is, if you 
are 100% sure that " +
+      "the written files will only be read by Spark 3.0+ or other systems that 
use Proleptic " +
+      "Gregorian calendar.", null)
+  }
+
+  def creteDateRebaseFuncInRead(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Int => Int = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => days: Int =>
+      if (days < RebaseDateTime.lastSwitchJulianDay) {
+        throw DataSourceUtils.newRebaseExceptionInRead(format)
+      }
+      days
+    case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseJulianToGregorianDays
+    case LegacyBehaviorPolicy.CORRECTED => identity[Int]
+  }
+
+  def creteDateRebaseFuncInWrite(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Int => Int = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => days: Int =>
+      if (days < RebaseDateTime.lastSwitchGregorianDay) {
+        throw DataSourceUtils.newRebaseExceptionInWrite(format)
+      }
+      days
+    case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianDays
+    case LegacyBehaviorPolicy.CORRECTED => identity[Int]
+  }
+
+  def creteTimestampRebaseFuncInRead(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Long => Long = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => micros: Long =>
+      if (micros < RebaseDateTime.lastSwitchJulianTs) {
+        throw DataSourceUtils.newRebaseExceptionInRead(format)
+      }
+      micros
+    case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseJulianToGregorianMicros
+    case LegacyBehaviorPolicy.CORRECTED => identity[Long]
+  }
+
+  def creteTimestampRebaseFuncInWrite(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Long => Long = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => micros: Long =>
+      if (micros < RebaseDateTime.lastSwitchGregorianTs) {
+        throw DataSourceUtils.newRebaseExceptionInWrite(format)
+      }
+      micros
+    case LegacyBehaviorPolicy.LEGACY => 
RebaseDateTime.rebaseGregorianToJulianMicros
+    case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 542c996..fc59336 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -21,7 +21,7 @@ import java.io.{FileNotFoundException, IOException}
 
 import org.apache.parquet.io.ParquetDecodingException
 
-import org.apache.spark.{Partition => RDDPartition, TaskContext}
+import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, 
TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
@@ -178,7 +178,9 @@ class FileScanRDD(
                 s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
               throw new QueryExecutionException(message, e)
             case e: ParquetDecodingException =>
-              if (e.getMessage.contains("Can not read value at")) {
+              if (e.getCause.isInstanceOf[SparkUpgradeException]) {
+                throw e.getCause
+              } else if (e.getMessage.contains("Can not read value at")) {
                 val message = "Encounter error while reading parquet files. " +
                   "One possible cause: Parquet column cannot be converted in 
the " +
                   "corresponding files. Details: "
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c6d9ddf..7187410 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -300,10 +300,9 @@ class ParquetFileFormat
           None
         }
 
-      val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
-        footerFileMetaData.getKeyValueMetaData.get).getOrElse {
-        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
-      }
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
 
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
       val hadoopAttemptContext =
@@ -318,7 +317,7 @@ class ParquetFileFormat
       if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader(
           convertTz.orNull,
-          rebaseDateTime,
+          datetimeRebaseMode.toString,
           enableOffHeapColumnVector && taskContext.isDefined,
           capacity)
         val iter = new RecordReaderIterator(vectorizedReader)
@@ -337,7 +336,7 @@ class ParquetFileFormat
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns InternalRow
         val readSupport = new ParquetReadSupport(
-          convertTz, enableVectorizedReader = false, rebaseDateTime)
+          convertTz, enableVectorizedReader = false, datetimeRebaseMode)
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
           new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 28165e0..a30d1c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.parquet.schema.Type.Repetition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 /**
@@ -53,7 +54,7 @@ import org.apache.spark.sql.types._
 class ParquetReadSupport(
     val convertTz: Option[ZoneId],
     enableVectorizedReader: Boolean,
-    rebaseDateTime: Boolean)
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
   extends ReadSupport[InternalRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
@@ -61,7 +62,7 @@ class ParquetReadSupport(
     // We need a zero-arg constructor for SpecificParquetRecordReaderBase.  
But that is only
     // used in the vectorized reader, where we get the 
convertTz/rebaseDateTime value directly,
     // and the values here are ignored.
-    this(None, enableVectorizedReader = true, rebaseDateTime = false)
+    this(None, enableVectorizedReader = true, datetimeRebaseMode = 
LegacyBehaviorPolicy.CORRECTED)
   }
 
   /**
@@ -130,7 +131,7 @@ class ParquetReadSupport(
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
       new ParquetToSparkSchemaConverter(conf),
       convertTz,
-      rebaseDateTime)
+      datetimeRebaseMode)
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index ec03713..bb528d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -23,6 +23,7 @@ import org.apache.parquet.io.api.{GroupConverter, 
RecordMaterializer}
 import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -32,19 +33,19 @@ import org.apache.spark.sql.types.StructType
  * @param catalystSchema Catalyst schema of the rows to be constructed
  * @param schemaConverter A Parquet-Catalyst schema converter that helps 
initializing row converters
  * @param convertTz the optional time zone to convert to int96 data
- * @param rebaseDateTime true if need to rebase date/timestamp from Julian to 
Proleptic Gregorian
- *                       calendar
+ * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian 
to Proleptic Gregorian
+ *                           calendar
  */
 private[parquet] class ParquetRecordMaterializer(
     parquetSchema: MessageType,
     catalystSchema: StructType,
     schemaConverter: ParquetToSparkSchemaConverter,
     convertTz: Option[ZoneId],
-    rebaseDateTime: Boolean)
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
   extends RecordMaterializer[InternalRow] {
 
   private val rootConverter = new ParquetRowConverter(
-    schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime, 
NoopUpdater)
+    schemaConverter, parquetSchema, catalystSchema, convertTz, 
datetimeRebaseMode, NoopUpdater)
 
   override def getCurrentRecord: InternalRow = rootConverter.currentRecord
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 08fbca2..9d37f17 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -35,8 +35,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -121,8 +122,8 @@ private[parquet] class ParquetPrimitiveConverter(val 
updater: ParentContainerUpd
  * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type. User-defined
  *        types should have been expanded.
  * @param convertTz the optional time zone to convert to int96 data
- * @param rebaseDateTime true if need to rebase date/timestamp from Julian to 
Proleptic Gregorian
- *                       calendar
+ * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian 
to Proleptic Gregorian
+ *                           calendar
  * @param updater An updater which propagates converted field values to the 
parent container
  */
 private[parquet] class ParquetRowConverter(
@@ -130,7 +131,7 @@ private[parquet] class ParquetRowConverter(
     parquetType: GroupType,
     catalystType: StructType,
     convertTz: Option[ZoneId],
-    rebaseDateTime: Boolean,
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value,
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
@@ -181,6 +182,12 @@ private[parquet] class ParquetRowConverter(
    */
   def currentRecord: InternalRow = currentRow
 
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+    datetimeRebaseMode, "Parquet")
+
+  private val timestampRebaseFunc = 
DataSourceUtils.creteTimestampRebaseFuncInRead(
+    datetimeRebaseMode, "Parquet")
+
   // Converters for each field.
   private[this] val fieldConverters: Array[Converter with 
HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is 
false
@@ -275,35 +282,17 @@ private[parquet] class ParquetRowConverter(
         new ParquetStringConverter(updater)
 
       case TimestampType if parquetType.getOriginalType == 
OriginalType.TIMESTAMP_MICROS =>
-        if (rebaseDateTime) {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              val rebased = rebaseJulianToGregorianMicros(value)
-              updater.setLong(rebased)
-            }
-          }
-        } else {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              updater.setLong(value)
-            }
+        new ParquetPrimitiveConverter(updater) {
+          override def addLong(value: Long): Unit = {
+            updater.setLong(timestampRebaseFunc(value))
           }
         }
 
       case TimestampType if parquetType.getOriginalType == 
OriginalType.TIMESTAMP_MILLIS =>
-        if (rebaseDateTime) {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              val micros = DateTimeUtils.fromMillis(value)
-              val rebased = rebaseJulianToGregorianMicros(micros)
-              updater.setLong(rebased)
-            }
-          }
-        } else {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              updater.setLong(DateTimeUtils.fromMillis(value))
-            }
+        new ParquetPrimitiveConverter(updater) {
+          override def addLong(value: Long): Unit = {
+            val micros = DateTimeUtils.fromMillis(value)
+            updater.setLong(timestampRebaseFunc(micros))
           }
         }
 
@@ -328,17 +317,9 @@ private[parquet] class ParquetRowConverter(
         }
 
       case DateType =>
-        if (rebaseDateTime) {
-          new ParquetPrimitiveConverter(updater) {
-            override def addInt(value: Int): Unit = {
-              updater.set(rebaseJulianToGregorianDays(value))
-            }
-          }
-        } else {
-          new ParquetPrimitiveConverter(updater) {
-            override def addInt(value: Int): Unit = {
-              updater.set(value)
-            }
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit = {
+            updater.set(dateRebaseFunc(value))
           }
         }
 
@@ -386,7 +367,12 @@ private[parquet] class ParquetRowConverter(
           }
         }
         new ParquetRowConverter(
-          schemaConverter, parquetType.asGroupType(), t, convertTz, 
rebaseDateTime, wrappedUpdater)
+          schemaConverter,
+          parquetType.asGroupType(),
+          t,
+          convertTz,
+          datetimeRebaseMode,
+          wrappedUpdater)
 
       case t =>
         throw new RuntimeException(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index e367b9c..4e535c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -35,8 +35,9 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 /**
@@ -78,9 +79,14 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
   private val decimalBuffer =
     new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION))
 
-  // Whether to rebase datetimes from Gregorian to Julian calendar in write
-  private val rebaseDateTime: Boolean =
-    SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE)
+  private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
+    SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE))
+
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
+    datetimeRebaseMode, "Parquet")
+
+  private val timestampRebaseFunc = 
DataSourceUtils.creteTimestampRebaseFuncInWrite(
+    datetimeRebaseMode, "Parquet")
 
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -103,7 +109,13 @@ class ParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
     val metadata = Map(
       SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
       ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
-    ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None)
+    ) ++ {
+      if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
+        Some(SPARK_LEGACY_DATETIME -> "")
+      } else {
+        None
+      }
+    }
 
     logInfo(
       s"""Initialized Parquet WriteSupport with Catalyst schema:
@@ -152,12 +164,11 @@ class ParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getShort(ordinal))
 
-      case DateType if rebaseDateTime =>
+      case DateType =>
         (row: SpecializedGetters, ordinal: Int) =>
-          val rebasedDays = rebaseGregorianToJulianDays(row.getInt(ordinal))
-          recordConsumer.addInteger(rebasedDays)
+          recordConsumer.addInteger(dateRebaseFunc(row.getInt(ordinal)))
 
-      case IntegerType | DateType =>
+      case IntegerType =>
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getInt(ordinal))
 
@@ -187,24 +198,15 @@ class ParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
               
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
               
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
 
-          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if 
rebaseDateTime =>
-            (row: SpecializedGetters, ordinal: Int) =>
-              val rebasedMicros = 
rebaseGregorianToJulianMicros(row.getLong(ordinal))
-              recordConsumer.addLong(rebasedMicros)
-
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
             (row: SpecializedGetters, ordinal: Int) =>
-              recordConsumer.addLong(row.getLong(ordinal))
-
-          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if 
rebaseDateTime =>
-            (row: SpecializedGetters, ordinal: Int) =>
-              val rebasedMicros = 
rebaseGregorianToJulianMicros(row.getLong(ordinal))
-              val millis = DateTimeUtils.toMillis(rebasedMicros)
-              recordConsumer.addLong(millis)
+              val micros = row.getLong(ordinal)
+              recordConsumer.addLong(timestampRebaseFunc(micros))
 
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
             (row: SpecializedGetters, ordinal: Int) =>
-              val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
+              val micros = row.getLong(ordinal)
+              val millis = DateTimeUtils.toMillis(timestampRebaseFunc(micros))
               recordConsumer.addLong(millis)
         }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 1925fa1..3b482b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -37,6 +37,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{AtomicType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -116,8 +117,9 @@ case class ParquetPartitionReaderFactory(
   private def buildReaderBase[T](
       file: PartitionedFile,
       buildReaderFunc: (
-        ParquetInputSplit, InternalRow, TaskAttemptContextImpl, 
Option[FilterPredicate],
-          Option[ZoneId], Boolean) => RecordReader[Void, T]): 
RecordReader[Void, T] = {
+        ParquetInputSplit, InternalRow, TaskAttemptContextImpl,
+          Option[FilterPredicate], Option[ZoneId],
+          LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): 
RecordReader[Void, T] = {
     val conf = broadcastedConf.value.value
 
     val filePath = new Path(new URI(file.filePath))
@@ -169,12 +171,11 @@ case class ParquetPartitionReaderFactory(
     if (pushed.isDefined) {
       
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
     }
-    val rebaseDatetime = DataSourceUtils.needRebaseDateTime(
-      footerFileMetaData.getKeyValueMetaData.get).getOrElse {
-      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
-    }
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get,
+      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
     val reader = buildReaderFunc(
-      split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, 
rebaseDatetime)
+      split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, 
datetimeRebaseMode)
     reader.initialize(split, hadoopAttemptContext)
     reader
   }
@@ -189,12 +190,12 @@ case class ParquetPartitionReaderFactory(
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
       convertTz: Option[ZoneId],
-      needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = {
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value): RecordReader[Void, 
InternalRow] = {
     logDebug(s"Falling back to parquet-mr")
     val taskContext = Option(TaskContext.get())
     // ParquetRecordReader returns InternalRow
     val readSupport = new ParquetReadSupport(
-      convertTz, enableVectorizedReader = false, needDateTimeRebase)
+      convertTz, enableVectorizedReader = false, datetimeRebaseMode)
     val reader = if (pushed.isDefined && enableRecordFilter) {
       val parquetFilter = FilterCompat.get(pushed.get, null)
       new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -220,11 +221,11 @@ case class ParquetPartitionReaderFactory(
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
       convertTz: Option[ZoneId],
-      rebaseDatetime: Boolean): VectorizedParquetRecordReader = {
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value): 
VectorizedParquetRecordReader = {
     val taskContext = Option(TaskContext.get())
     val vectorizedReader = new VectorizedParquetRecordReader(
       convertTz.orNull,
-      rebaseDatetime,
+      datetimeRebaseMode.toString,
       enableOffHeapColumnVector && taskContext.isDefined,
       capacity)
     val iter = new RecordReaderIterator(vectorizedReader)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
index aa47d36..d6167f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY
 import 
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
 
 object DateTime extends Enumeration {
@@ -161,9 +162,10 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
               Seq(true, false).foreach { modernDates =>
                 Seq(false, true).foreach { rebase =>
                   benchmark.addCase(caseName(modernDates, dateTime, 
Some(rebase)), 1) { _ =>
+                    val mode = if (rebase) LEGACY else CORRECTED
                     withSQLConf(
                       SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> 
getOutputType(dateTime),
-                      SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> 
rebase.toString) {
+                      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
mode.toString) {
                       genDF(rowsNum, dateTime, modernDates)
                         .write
                         .mode("overwrite")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index cf2c7c8..87b4db3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -41,7 +41,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, 
SparkUpgradeException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
@@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -892,41 +893,67 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
         val path3_0_rebase = paths(1).getCanonicalPath
         if (dt == "date") {
           val df = 
Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
-          df.write.parquet(path3_0)
-          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> 
"true") {
+
+          // By default we should fail to write ancient datetime values.
+          var e = intercept[SparkException](df.write.parquet(path3_0))
+          
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+          // By default we should fail to read ancient datetime values.
+          e = intercept[SparkException](spark.read.parquet(path2_4).collect())
+          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString) {
+            df.write.mode("overwrite").parquet(path3_0)
+          }
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
             df.write.parquet(path3_0_rebase)
           }
-          checkAnswer(
-            spark.read.format("parquet").load(path2_4, path3_0, 
path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+
+          // For Parquet files written by Spark 3.0, we know the writer info 
and don't need the
+          // config to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> 
LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("parquet").load(path2_4, path3_0, 
path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+          }
         } else {
           val df = 
Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
           withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
-            df.write.parquet(path3_0)
-            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> 
"true") {
+            // By default we should fail to write ancient datetime values.
+            var e = intercept[SparkException](df.write.parquet(path3_0))
+            
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+            // By default we should fail to read ancient datetime values.
+            e = 
intercept[SparkException](spark.read.parquet(path2_4).collect())
+            assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString) {
+              df.write.mode("overwrite").parquet(path3_0)
+            }
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
               df.write.parquet(path3_0_rebase)
             }
           }
-          checkAnswer(
-            spark.read.format("parquet").load(path2_4, path3_0, 
path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+          // For Parquet files written by Spark 3.0, we know the writer info 
and don't need the
+          // config to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> 
LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("parquet").load(path2_4, path3_0, 
path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+          }
         }
       }
     }
 
     Seq(false, true).foreach { vectorized =>
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> 
"true") {
-          checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", 
"1001-01-01")
-          checkReadMixedFiles(
-            "before_1582_timestamp_micros_v2_4.snappy.parquet",
-            "TIMESTAMP_MICROS",
-            "1001-01-01 01:02:03.123456")
-          checkReadMixedFiles(
-            "before_1582_timestamp_millis_v2_4.snappy.parquet",
-            "TIMESTAMP_MILLIS",
-            "1001-01-01 01:02:03.123")
-        }
+        checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", 
"1001-01-01")
+        checkReadMixedFiles(
+          "before_1582_timestamp_micros_v2_4.snappy.parquet",
+          "TIMESTAMP_MICROS",
+          "1001-01-01 01:02:03.123456")
+        checkReadMixedFiles(
+          "before_1582_timestamp_millis_v2_4.snappy.parquet",
+          "TIMESTAMP_MILLIS",
+          "1001-01-01 01:02:03.123")
 
         // INT96 is a legacy timestamp format and we always rebase the seconds 
for it.
         checkAnswer(readResourceParquetFile(
@@ -948,7 +975,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
           withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
             withTempPath { dir =>
               val path = dir.getAbsolutePath
-              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key 
-> "true") {
+              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
                 Seq.tabulate(N)(_ => tsStr).toDF("tsS")
                   .select($"tsS".cast("timestamp").as("ts"))
                   .repartition(1)
@@ -960,10 +987,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
               Seq(false, true).foreach { vectorized =>
                 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
                   // The file metadata indicates if it needs rebase or not, so 
we can always get the
-                  // correct result regardless of the "rebaseInRead" config.
-                  Seq(true, false).foreach { rebase =>
+                  // correct result regardless of the "rebase mode" config.
+                  Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
                     withSQLConf(
-                      SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> 
rebase.toString) {
+                      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> 
mode.toString) {
                       checkAnswer(
                         spark.read.parquet(path),
                         Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
@@ -991,7 +1018,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     Seq(false, true).foreach { dictionaryEncoding =>
       withTempPath { dir =>
         val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> 
"true") {
+        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
           Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS")
             .select($"dateS".cast("date").as("date"))
             .repartition(1)
@@ -1002,10 +1029,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
         Seq(false, true).foreach { vectorized =>
           withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-            // The file metadata indicates if it needs rebase or not, so we 
can always get
-            // the correct result regardless of the "rebaseInRead" config.
-            Seq(true, false).foreach { rebase =>
-              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key 
-> rebase.toString) {
+            // The file metadata indicates if it needs rebase or not, so we 
can always get the
+            // correct result regardless of the "rebase mode" config.
+            Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+              withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
mode.toString) {
                 checkAnswer(
                   spark.read.parquet(path),
                   Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 42b6862..cbea741 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 
@@ -151,7 +152,10 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils with Tes
               Seq(false)
             }
             java8ApiConfValues.foreach { java8Api =>
-              withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> 
java8Api.toString) {
+              withSQLConf(
+                SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+                SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString,
+                SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
CORRECTED.toString) {
                 val dataGenerator = RandomDataGenerator.forType(
                   dataType = dataType,
                   nullable = true,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to