This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 33df1b6d237c [SPARK-51874][SQL][FOLLOW-UP] Revert API changes of 
rebase methods in DataSourceUtils and AvroOptions
33df1b6d237c is described below

commit 33df1b6d237ca426d862086dd20c0e747b4492c1
Author: Wenchen Fan <cloud0...@gmail.com>
AuthorDate: Wed Aug 20 21:56:13 2025 +0800

    [SPARK-51874][SQL][FOLLOW-UP] Revert API changes of rebase methods in 
DataSourceUtils and AvroOptions
    
    ### What changes were proposed in this pull request?
    
    A similar followup PR of https://github.com/apache/spark/pull/52065 . This 
PR restores the rebase APIs in `DataSourceUtils` for compatibility with 
external Spark plugins, and also in `AvroOptions` to simplify the code.
    
    ### Why are the changes needed?
    
    External plugins may use `DataSourceUtils` directly, see 
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala#L194
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #52074 from cloud-fan/follow.
    
    Lead-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala   | 4 ++--
 .../src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala    | 8 ++++----
 .../apache/spark/sql/execution/datasources/DataSourceUtils.scala  | 8 ++++----
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala     | 8 +++-----
 .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala    | 8 +++-----
 5 files changed, 16 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 65fafb5a34c6..f66b5bd988c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -58,7 +58,7 @@ private[sql] class AvroDeserializer(
   def this(
       rootAvroType: Schema,
       rootCatalystType: DataType,
-      datetimeRebaseMode: LegacyBehaviorPolicy.Value,
+      datetimeRebaseMode: String,
       useStableIdForUnionType: Boolean,
       stableIdPrefixForUnionType: String,
       recursiveFieldMaxDepth: Int) = {
@@ -66,7 +66,7 @@ private[sql] class AvroDeserializer(
       rootAvroType,
       rootCatalystType,
       positionalFieldMatch = false,
-      RebaseSpec(datetimeRebaseMode),
+      RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
       new NoopFilters,
       useStableIdForUnionType,
       stableIdPrefixForUnionType,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index ab3607d1bd7a..da42333fad0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, 
ParseMode}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Options for Avro Reader and Writer stored in case insensitive manner.
@@ -129,9 +129,9 @@ private[sql] class AvroOptions(
   /**
    * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS 
values in reads.
    */
-  val datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
-    .get(DATETIME_REBASE_MODE).map(LegacyBehaviorPolicy.withName)
-    .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
+  val datetimeRebaseModeInRead: String = parameters
+    .get(DATETIME_REBASE_MODE)
+    .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ).toString)
 
   val useStableIdForUnionType: Boolean =
     parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 10cfe9f145f6..d43c9eab0a5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -128,7 +128,7 @@ object DataSourceUtils extends PredicateHelper {
 
   private def getRebaseSpec(
       lookupFileMeta: String => String,
-      modeByConfig: LegacyBehaviorPolicy.Value,
+      modeByConfig: String,
       minVersion: String,
       metadataKey: String): RebaseSpec = {
     val policy = if (Utils.isTesting &&
@@ -146,7 +146,7 @@ object DataSourceUtils extends PredicateHelper {
         } else {
           LegacyBehaviorPolicy.CORRECTED
         }
-      }.getOrElse(modeByConfig)
+      }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
     }
     policy match {
       case LegacyBehaviorPolicy.LEGACY =>
@@ -157,7 +157,7 @@ object DataSourceUtils extends PredicateHelper {
 
   def datetimeRebaseSpec(
       lookupFileMeta: String => String,
-      modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = {
+      modeByConfig: String): RebaseSpec = {
     getRebaseSpec(
       lookupFileMeta,
       modeByConfig,
@@ -167,7 +167,7 @@ object DataSourceUtils extends PredicateHelper {
 
   def int96RebaseSpec(
       lookupFileMeta: String => String,
-      modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = {
+      modeByConfig: String): RebaseSpec = {
     getRebaseSpec(
       lookupFileMeta,
       modeByConfig,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c56c947e3da5..661be2b9cfa0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, 
OffHeapColumnVector, OnHeapColumnVector}
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
@@ -182,10 +182,8 @@ class ParquetFileFormat
     val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
     val isCaseSensitive = sqlConf.caseSensitiveAnalysis
     val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
-    val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
-      parquetOptions.datetimeRebaseModeInRead)
-    val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
-      parquetOptions.int96RebaseModeInRead)
+    val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
+    val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
 
     // Should always be set by FileSourceScanExec creating this.
     // Check conf before checking option, to allow working around an issue by 
changing conf.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 4674320e8498..70ae8068a03a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -81,10 +81,8 @@ case class ParquetPartitionReaderFactory(
   private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
   private val pushDownStringPredicate = 
sqlConf.parquetFilterPushDownStringPredicate
   private val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
-  private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
-    options.datetimeRebaseModeInRead)
-  private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
-    options.int96RebaseModeInRead)
+  private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
+  private val int96RebaseModeInRead = options.int96RebaseModeInRead
 
   private val parquetReaderCallback = new ParquetReaderCallback()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to