This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 596d03fc541c [SPARK-53415][SQL] Simply options for builtin FileFormats 596d03fc541c is described below commit 596d03fc541cd9433fdfd511e131aa1cbc603a85 Author: Kent Yao <y...@apache.org> AuthorDate: Thu Aug 28 09:18:19 2025 -0700 [SPARK-53415][SQL] Simply options for builtin FileFormats ### What changes were proposed in this pull request? Simplify interoperations between SQLConf and file-format options in the rest of FileFormats to follow TextBasedFileFormats(#51398 / #51398) ### Why are the changes needed? - Reduce code duplication - Restore type annotation for IDE ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #52160 from yaooqinn/SPARK-53415. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 9 +++-- .../datasources/binaryfile/BinaryFileFormat.scala | 6 ++- .../execution/datasources/orc/OrcFileFormat.scala | 19 +++++---- .../datasources/parquet/ParquetFileFormat.scala | 46 +++++++++++----------- .../spark/sql/hive/execution/HiveFileFormat.scala | 8 +++- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 16 +++++--- 6 files changed, 61 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index bf35a78488e4..90781d4ad707 100755 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -34,13 +34,16 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration private[sql] class AvroFileFormat extends FileFormat - with DataSourceRegister with Logging with Serializable { + with DataSourceRegister + with SessionStateHelper + with Logging + with Serializable { AvroFileFormat.registerCustomAvroTypes() @@ -73,7 +76,7 @@ private[sql] class AvroFileFormat extends FileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - AvroUtils.prepareWrite(spark.sessionState.conf, job, options, dataSchema) + AvroUtils.prepareWrite(getSqlConf(spark), job, options, dataSchema) } override def buildReader( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index e242609fa58b..57e0efb993fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.internal.SessionStateHelper import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or} import org.apache.spark.sql.types._ @@ -55,7 +56,8 @@ import org.apache.spark.util.SerializableConfiguration * .load("/path/to/fileDir"); * }}} */ -case class BinaryFileFormat() extends FileFormat with DataSourceRegister { +case class BinaryFileFormat() extends FileFormat + with DataSourceRegister with SessionStateHelper { import BinaryFileFormat._ @@ -98,7 +100,7 @@ case class BinaryFileFormat() extends FileFormat with DataSourceRegister { val broadcastedHadoopConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) val filterFuncs = filters.flatMap(filter => createFilterFunction(filter)) - val maxLength = sparkSession.sessionState.conf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH) + val maxLength = getSqlConf(sparkSession).getConf(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { val path = file.toPath diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 86528bf7a0af..40d10d0e4403 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SessionStateHelper import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -48,6 +49,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} class OrcFileFormat extends FileFormat with DataSourceRegister + with SessionStateHelper with Serializable { override def shortName(): String = "orc" @@ -70,7 +72,8 @@ class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + val sqlConf = getSqlConf(sparkSession) + val orcOptions = new OrcOptions(options, sqlConf) val conf = job.getConfiguration @@ -79,7 +82,7 @@ class OrcFileFormat conf.asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - val batchSize = sparkSession.sessionState.conf.orcVectorizedWriterBatchSize + val batchSize = sqlConf.orcVectorizedWriterBatchSize new OutputWriterFactory { override def newInstance( @@ -101,10 +104,10 @@ class OrcFileFormat } override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - val conf = sparkSession.sessionState.conf - conf.orcVectorizedReaderEnabled && + val sqlConf = getSqlConf(sparkSession) + sqlConf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( - s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) + s.dataType, sqlConf.orcVectorizedReaderNestedColumnEnabled)) } override def isSplitable( @@ -136,7 +139,7 @@ class OrcFileFormat hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) - val sqlConf = sparkSession.sessionState.conf + val sqlConf = getSqlConf(sparkSession) val capacity = sqlConf.orcVectorizedReaderBatchSize // Should always be set by FileSourceScanExec creating this. @@ -163,8 +166,8 @@ class OrcFileFormat val broadcastedConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) - val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val orcFilterPushDown = sqlConf.orcFilterPushDown (file: PartitionedFile) => { val conf = broadcastedConf.value.value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 661be2b9cfa0..be6e5d188667 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -52,6 +52,7 @@ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} class ParquetFileFormat extends FileFormat with DataSourceRegister + with SessionStateHelper with Logging with Serializable { @@ -68,7 +69,7 @@ class ParquetFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val sqlConf = sparkSession.sessionState.conf + val sqlConf = getSqlConf(sparkSession) val parquetOptions = new ParquetOptions(options, sqlConf) ParquetUtils.prepareWrite(sqlConf, job, dataSchema, parquetOptions) } @@ -84,8 +85,7 @@ class ParquetFileFormat * Returns whether the reader can return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - val conf = sparkSession.sessionState.conf - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) + ParquetUtils.isBatchReadSupportedForSchema(getSqlConf(sparkSession), schema) } override def vectorTypes( @@ -128,6 +128,7 @@ class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + val sqlConf = getSqlConf(sparkSession) hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, @@ -137,27 +138,27 @@ class ParquetFileFormat requiredSchema.json) hadoopConf.set( SQLConf.SESSION_LOCAL_TIMEZONE.key, - sparkSession.sessionState.conf.sessionLocalTimeZone) + sqlConf.sessionLocalTimeZone) hadoopConf.setBoolean( SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - sparkSession.sessionState.conf.nestedSchemaPruningEnabled) + sqlConf.nestedSchemaPruningEnabled) hadoopConf.setBoolean( SQLConf.CASE_SENSITIVE.key, - sparkSession.sessionState.conf.caseSensitiveAnalysis) + sqlConf.caseSensitiveAnalysis) // Sets flags for `ParquetToSparkSchemaConverter` hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.sessionState.conf.isParquetBinaryAsString) + sqlConf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sqlConf.isParquetINT96AsTimestamp) hadoopConf.setBoolean( SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, - sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + sqlConf.parquetInferTimestampNTZEnabled) hadoopConf.setBoolean( SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, - sparkSession.sessionState.conf.legacyParquetNanosAsLong) + sqlConf.legacyParquetNanosAsLong) val broadcastedHadoopConf = @@ -167,7 +168,6 @@ class ParquetFileFormat // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) - val sqlConf = sparkSession.sessionState.conf val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) @@ -181,13 +181,13 @@ class ParquetFileFormat val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + val parquetOptions = new ParquetOptions(options, sqlConf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. - val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled && + val returningBatch = sqlConf.parquetVectorizedReaderEnabled && options.getOrElse(FileFormat.OPTION_RETURNING_BATCH, throw new IllegalArgumentException( "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + @@ -380,11 +380,12 @@ object ParquetFileFormat extends Logging { private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { + val sqlConf = SessionStateHelper.getSqlConf(sparkSession) val converter = new ParquetToSparkSchemaConverter( - sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp, - inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled, - nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong) + sqlConf.isParquetBinaryAsString, + sqlConf.isParquetINT96AsTimestamp, + inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled, + nanosAsLong = sqlConf.legacyParquetNanosAsLong) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -478,10 +479,11 @@ object ParquetFileFormat extends Logging { parameters: Map[String, String], filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType] = { - val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString - val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp - val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled - val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong + val sqlConf = SessionStateHelper.getSqlConf(sparkSession) + val assumeBinaryIsString = sqlConf.isParquetBinaryAsString + val assumeInt96IsTimestamp = sqlConf.isParquetINT96AsTimestamp + val inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled + val nanosAsLong = sqlConf.legacyParquetNanosAsLong val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 33bf6ae1554c..5506cf8dae07 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil} +import org.apache.spark.sql.internal.SessionStateHelper import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableJobConf @@ -49,7 +50,10 @@ import org.apache.spark.util.SerializableJobConf * TODO: implement the read logic. */ case class HiveFileFormat(fileSinkConf: FileSinkDesc) - extends FileFormat with DataSourceRegister with Logging { + extends FileFormat + with SessionStateHelper + with DataSourceRegister + with Logging { def this() = this(null) @@ -75,7 +79,7 @@ case class HiveFileFormat(fileSinkConf: FileSinkDesc) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. - val speculationEnabled = sparkSession.sparkContext.conf.get(SPECULATION_ENABLED) + val speculationEnabled = getSparkConf(sparkSession).get(SPECULATION_ENABLED) val outputCommitterClass = conf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 440fc582b93f..ba37e5c176de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.{OrcFilters, OrcOptions, OrcUtils} import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} +import org.apache.spark.sql.internal.SessionStateHelper import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -54,7 +55,10 @@ import org.apache.spark.util.SerializableConfiguration * `FileFormat` for reading ORC files. If this is moved or renamed, please update * `DataSource`'s backwardCompatibilityMap. */ -case class OrcFileFormat() extends FileFormat with DataSourceRegister with Serializable { +case class OrcFileFormat() extends FileFormat + with DataSourceRegister + with SessionStateHelper + with Serializable { override def shortName(): String = "orc" @@ -64,14 +68,14 @@ case class OrcFileFormat() extends FileFormat with DataSourceRegister with Seria sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + val orcOptions = new OrcOptions(options, getSqlConf(sparkSession)) if (orcOptions.mergeSchema) { SchemaMergeUtils.mergeSchemasInParallel( sparkSession, options, files, OrcFileOperator.readOrcSchemasInParallel) } else { OrcFileOperator.readSchema( files.map(_.getPath.toString), - Some(sparkSession.sessionState.newHadoopConfWithOptions(options)), + Some(getHadoopConf(sparkSession, options)), orcOptions.ignoreCorruptFiles ) } @@ -83,7 +87,7 @@ case class OrcFileFormat() extends FileFormat with DataSourceRegister with Seria options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + val orcOptions = new OrcOptions(options, getSqlConf(sparkSession)) val configuration = job.getConfiguration @@ -133,7 +137,7 @@ case class OrcFileFormat() extends FileFormat with DataSourceRegister with Seria options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - if (sparkSession.sessionState.conf.orcFilterPushDown) { + if (getSqlConf(sparkSession).orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(requiredSchema, filters).foreach { f => hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f)) @@ -144,7 +148,7 @@ case class OrcFileFormat() extends FileFormat with DataSourceRegister with Seria val broadcastedHadoopConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) val ignoreCorruptFiles = - new OrcOptions(options, sparkSession.sessionState.conf).ignoreCorruptFiles + new OrcOptions(options, getSqlConf(sparkSession)).ignoreCorruptFiles (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org