This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 33df1b6d237c [SPARK-51874][SQL][FOLLOW-UP] Revert API changes of rebase methods in DataSourceUtils and AvroOptions 33df1b6d237c is described below commit 33df1b6d237ca426d862086dd20c0e747b4492c1 Author: Wenchen Fan <cloud0...@gmail.com> AuthorDate: Wed Aug 20 21:56:13 2025 +0800 [SPARK-51874][SQL][FOLLOW-UP] Revert API changes of rebase methods in DataSourceUtils and AvroOptions ### What changes were proposed in this pull request? A similar followup PR of https://github.com/apache/spark/pull/52065 . This PR restores the rebase APIs in `DataSourceUtils` for compatibility with external Spark plugins, and also in `AvroOptions` to simplify the code. ### Why are the changes needed? External plugins may use `DataSourceUtils` directly, see https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala#L194 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #52074 from cloud-fan/follow. Lead-authored-by: Wenchen Fan <cloud0...@gmail.com> Co-authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 8 ++++---- .../apache/spark/sql/execution/datasources/DataSourceUtils.scala | 8 ++++---- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 8 +++----- .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 8 +++----- 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 65fafb5a34c6..f66b5bd988c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -58,7 +58,7 @@ private[sql] class AvroDeserializer( def this( rootAvroType: Schema, rootCatalystType: DataType, - datetimeRebaseMode: LegacyBehaviorPolicy.Value, + datetimeRebaseMode: String, useStableIdForUnionType: Boolean, stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int) = { @@ -66,7 +66,7 @@ private[sql] class AvroDeserializer( rootAvroType, rootCatalystType, positionalFieldMatch = false, - RebaseSpec(datetimeRebaseMode), + RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)), new NoopFilters, useStableIdForUnionType, stableIdPrefixForUnionType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index ab3607d1bd7a..da42333fad0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf /** * Options for Avro Reader and Writer stored in case insensitive manner. @@ -129,9 +129,9 @@ private[sql] class AvroOptions( /** * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. */ - val datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters - .get(DATETIME_REBASE_MODE).map(LegacyBehaviorPolicy.withName) - .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ)) + val datetimeRebaseModeInRead: String = parameters + .get(DATETIME_REBASE_MODE) + .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ).toString) val useStableIdForUnionType: Boolean = parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 10cfe9f145f6..d43c9eab0a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -128,7 +128,7 @@ object DataSourceUtils extends PredicateHelper { private def getRebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value, + modeByConfig: String, minVersion: String, metadataKey: String): RebaseSpec = { val policy = if (Utils.isTesting && @@ -146,7 +146,7 @@ object DataSourceUtils extends PredicateHelper { } else { LegacyBehaviorPolicy.CORRECTED } - }.getOrElse(modeByConfig) + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) } policy match { case LegacyBehaviorPolicy.LEGACY => @@ -157,7 +157,7 @@ object DataSourceUtils extends PredicateHelper { def datetimeRebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = { + modeByConfig: String): RebaseSpec = { getRebaseSpec( lookupFileMeta, modeByConfig, @@ -167,7 +167,7 @@ object DataSourceUtils extends PredicateHelper { def int96RebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = { + modeByConfig: String): RebaseSpec = { getRebaseSpec( lookupFileMeta, modeByConfig, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c56c947e3da5..661be2b9cfa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -182,10 +182,8 @@ class ParquetFileFormat val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( - parquetOptions.datetimeRebaseModeInRead) - val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( - parquetOptions.int96RebaseModeInRead) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 4674320e8498..70ae8068a03a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -81,10 +81,8 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( - options.datetimeRebaseModeInRead) - private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( - options.int96RebaseModeInRead) + private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead + private val int96RebaseModeInRead = options.int96RebaseModeInRead private val parquetReaderCallback = new ParquetReaderCallback() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org