This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bfafad4d47b4 [SPARK-46440][SQL] Set the rebase configs to the `CORRECTED` mode by default bfafad4d47b4 is described below commit bfafad4d47b4f60e93d17ccc3a8dcc8bae03cf9a Author: Max Gekk <max.g...@gmail.com> AuthorDate: Mon Dec 18 11:41:29 2023 +0300 [SPARK-46440][SQL] Set the rebase configs to the `CORRECTED` mode by default ### What changes were proposed in this pull request? In the PR, I propose to set all rebase related SQL configs to the `CORRECTED` mode by default. Here are the affected configs: - spark.sql.parquet.int96RebaseModeInWrite - spark.sql.parquet.datetimeRebaseModeInWrite - spark.sql.parquet.int96RebaseModeInRead - spark.sql.parquet.datetimeRebaseModeInRead - spark.sql.avro.datetimeRebaseModeInWrite - spark.sql.avro.datetimeRebaseModeInRead ### Why are the changes needed? The configs were set to the `EXCEPTION` mode to give users a choice to select proper mode for compatibility with old Spark versions <= 2.4.5. Those versions are not able to detect the rebase mode from meta information in parquet and avro files. Since the versions are out of broad usage, Spark starting from the version 4.0.0 will write/ read ancient datatime without rebasing and any exceptions. This should be more convenient for users. ### Does this PR introduce _any_ user-facing change? Yes, it can if user's code expects an exception while reading/writing ancient datetimes. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite" $ build/sbt "test:testOnly *ParquetRebaseDatetimeV2Suite" $ build/sbt "test:testOnly *RebaseDateTimeSuite" $ build/sbt "test:testOnly *AvroV1Suite" $ build/sbt "test:testOnly *AvroV2Suite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44388 from MaxGekk/set-rebase-conf-corrected. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 16 ++++++++++------ .../scala/org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++------ .../datasources/parquet/ParquetRebaseDatetimeSuite.scala | 6 +++--- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index e904ecdb3886..8a5634235639 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2347,13 +2347,17 @@ abstract class AvroSuite | ] |}""".stripMargin - // By default we should fail to write ancient datetime values. - val e = intercept[SparkException] { - df.write.format("avro").option("avroSchema", avroSchema).save(path3_x) + withSQLConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + val e = intercept[SparkException] { + df.write.format("avro").option("avroSchema", avroSchema).save(path3_x) + } + assert(e.getCause.getCause.isInstanceOf[SparkUpgradeException]) } - assert(e.getCause.getCause.isInstanceOf[SparkUpgradeException]) checkDefaultLegacyRead(oldPath) + // By default we should not fail to write ancient datetime values. + df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_x) + withSQLConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_x) } @@ -2378,9 +2382,9 @@ abstract class AvroSuite } def successInRead(path: String): Unit = spark.read.format("avro").load(path).collect() Seq( - // By default we should fail to read ancient datetime values when parquet files don't + // By default we should not fail to read ancient datetime values when parquet files don't // contain Spark version. - "2_4_5" -> failInRead _, + "2_4_5" -> successInRead _, "2_4_6" -> successInRead _, "3_2_0" -> successInRead _ ).foreach { case (version, checkDefaultRead) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9918d583d49e..23217f94d64e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4069,7 +4069,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) - .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString) val PARQUET_REBASE_MODE_IN_WRITE = buildConf("spark.sql.parquet.datetimeRebaseModeInWrite") @@ -4087,7 +4087,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) - .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString) val PARQUET_INT96_REBASE_MODE_IN_READ = buildConf("spark.sql.parquet.int96RebaseModeInRead") @@ -4103,7 +4103,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) - .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString) val PARQUET_REBASE_MODE_IN_READ = buildConf("spark.sql.parquet.datetimeRebaseModeInRead") @@ -4122,7 +4122,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) - .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString) val AVRO_REBASE_MODE_IN_WRITE = buildConf("spark.sql.avro.datetimeRebaseModeInWrite") @@ -4137,7 +4137,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) - .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString) val AVRO_REBASE_MODE_IN_READ = buildConf("spark.sql.avro.datetimeRebaseModeInRead") @@ -4153,7 +4153,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) - .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + .createWithDefault(LegacyBehaviorPolicy.CORRECTED.toString) val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT = buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 4f9064113454..35775ebbcbab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -173,9 +173,9 @@ abstract class ParquetRebaseDatetimeSuite assert(e.getCause.isInstanceOf[SparkUpgradeException]) } Seq( - // By default we should fail to read ancient datetime values when parquet files don't + // By default we should not fail to read ancient datetime values when parquet files don't // contain Spark version. - "2_4_5" -> failInRead _, + "2_4_5" -> successInRead _, "2_4_6" -> successInRead _, "3_2_0" -> successInRead _).foreach { case (version, checkDefaultRead) => withAllParquetReaders { @@ -201,7 +201,7 @@ abstract class ParquetRebaseDatetimeSuite } } Seq( - "2_4_5" -> failInRead _, + "2_4_5" -> successInRead _, "2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) => withAllParquetReaders { Seq("plain", "dict").foreach { enc => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org