This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a92ef00145b [SPARK-38767][SQL] Support `ignoreCorruptFiles` and 
`ignoreMissingFiles` in Data Source options
a92ef00145b is described below

commit a92ef00145b264013e11de12f2c7cee62c28198d
Author: yaohua <yaohua.z...@databricks.com>
AuthorDate: Tue Apr 12 12:50:13 2022 -0700

    [SPARK-38767][SQL] Support `ignoreCorruptFiles` and `ignoreMissingFiles` in 
Data Source options
    
    ### What changes were proposed in this pull request?
    Support `ignoreCorruptFiles` and `ignoreMissingFiles` in Data Source 
options for both V1 and V2 file-based data sources.
    ```
    spark.read...option("ignoreCorruptFiles", "true")...
    spark.read...option("ignoreMissingFiles", "true")...
    ```
    
    ### Why are the changes needed?
    Improve UX
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    Previously:
    ```
    spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
    spark.sql("set spark.sql.files.ignoreMissingFiles=true")
    spark.read...
    spark.read...
    ```
    
    Now:
    ```
    spark.read...option("ignoreCorruptFiles", "true")...
    spark.read...option("ignoreMissingFiles", "true")...
    ```
    
    ### How was this patch tested?
    Enhance existing UTs for ignoreMissingFiles + ignoreCorruptFiles
    
    Closes #36069 from Yaohua628/spark-38767.
    
    Authored-by: yaohua <yaohua.z...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/sql/avro/AvroOptions.scala    |  4 +-
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  5 ++-
 .../sql/v2/avro/AvroPartitionReaderFactory.scala   | 12 +++---
 docs/sql-data-sources-generic-options.md           |  4 +-
 .../examples/sql/JavaSQLDataSourceExample.java     | 19 +++++++--
 examples/src/main/python/sql/datasource.py         | 21 ++++++++--
 examples/src/main/r/RSparkSQLExample.R             | 14 +++++--
 .../spark/examples/sql/SQLDataSourceExample.scala  | 19 +++++++--
 .../spark/sql/catalyst/FileSourceOptions.scala     | 42 +++++++++++++++++++
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala |  3 +-
 .../spark/sql/catalyst/json/JSONOptions.scala      |  3 +-
 .../spark/sql/execution/DataSourceScanExec.scala   |  8 ++--
 .../sql/execution/datasources/FileScanRDD.scala    | 10 +++--
 .../execution/datasources/SchemaMergeUtils.scala   |  5 ++-
 .../execution/datasources/orc/OrcFileFormat.scala  |  5 ++-
 .../sql/execution/datasources/orc/OrcOptions.scala |  3 +-
 .../sql/execution/datasources/orc/OrcUtils.scala   |  7 ++--
 .../datasources/parquet/ParquetOptions.scala       |  3 +-
 .../execution/datasources/text/TextOptions.scala   |  3 +-
 .../datasources/v2/FilePartitionReader.scala       | 11 ++---
 .../v2/FilePartitionReaderFactory.scala            |  9 +++--
 .../v2/csv/CSVPartitionReaderFactory.scala         | 16 ++++----
 .../v2/json/JsonPartitionReaderFactory.scala       | 10 ++---
 .../v2/orc/OrcPartitionReaderFactory.scala         |  8 ++--
 .../sql/execution/datasources/v2/orc/OrcScan.scala |  6 ++-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |  8 ++--
 .../v2/text/TextPartitionReaderFactory.scala       |  8 ++--
 .../spark/sql/FileBasedDataSourceSuite.scala       | 31 ++++++++------
 .../datasources/parquet/ParquetQuerySuite.scala    | 47 +++++++++++++++-------
 29 files changed, 241 insertions(+), 103 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 48b2c3481a6..7f6a274753c 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, 
ParseMode}
 import org.apache.spark.sql.internal.SQLConf
 
@@ -33,7 +34,8 @@ import org.apache.spark.sql.internal.SQLConf
  */
 private[sql] class AvroOptions(
     @transient val parameters: CaseInsensitiveMap[String],
-    @transient val conf: Configuration) extends Logging with Serializable {
+    @transient val conf: Configuration)
+  extends FileSourceOptions(parameters) with Logging {
 
   def this(parameters: Map[String, String], conf: Configuration) = {
     this(CaseInsensitiveMap(parameters), conf)
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index ef9d22f35d0..d03902faab9 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -35,7 +35,8 @@ import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.OutputWriterFactory
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -57,7 +58,7 @@ private[sql] object AvroUtils extends Logging {
     val avroSchema = parsedOptions.schema
       .getOrElse {
         inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
-          spark.sessionState.conf.ignoreCorruptFiles)
+          new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
       }
 
     SchemaConverters.toSqlType(avroSchema).dataType match {
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index a4dfdbfe68f..3ad63f113fe 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.SerializableConfiguration
  * @param dataSchema Schema of AVRO files.
  * @param readDataSchema Required data schema of AVRO files.
  * @param partitionSchema Schema of partitions.
- * @param parsedOptions Options for parsing AVRO files.
+ * @param options Options for parsing AVRO files.
  */
 case class AvroPartitionReaderFactory(
     sqlConf: SQLConf,
@@ -54,15 +54,15 @@ case class AvroPartitionReaderFactory(
     dataSchema: StructType,
     readDataSchema: StructType,
     partitionSchema: StructType,
-    parsedOptions: AvroOptions,
+    options: AvroOptions,
     filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging {
-  private val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead
+  private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
 
   override def buildReader(partitionedFile: PartitionedFile): 
PartitionReader[InternalRow] = {
     val conf = broadcastedConf.value.value
-    val userProvidedSchema = parsedOptions.schema
+    val userProvidedSchema = options.schema
 
-    if (parsedOptions.ignoreExtension || 
partitionedFile.filePath.endsWith(".avro")) {
+    if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) 
{
       val reader = {
         val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf)
         try {
@@ -104,7 +104,7 @@ case class AvroPartitionReaderFactory(
         override val deserializer = new AvroDeserializer(
           userProvidedSchema.getOrElse(reader.getSchema),
           readDataSchema,
-          parsedOptions.positionalFieldMatching,
+          options.positionalFieldMatching,
           datetimeRebaseMode,
           avroFilters)
         override val stopPosition = partitionedFile.start + 
partitionedFile.length
diff --git a/docs/sql-data-sources-generic-options.md 
b/docs/sql-data-sources-generic-options.md
index 2e4fc879a43..7835371ec43 100644
--- a/docs/sql-data-sources-generic-options.md
+++ b/docs/sql-data-sources-generic-options.md
@@ -38,7 +38,7 @@ dir1/
 
 ### Ignore Corrupt Files
 
-Spark allows you to use `spark.sql.files.ignoreCorruptFiles` to ignore corrupt 
files while reading data
+Spark allows you to use the configuration `spark.sql.files.ignoreCorruptFiles` 
or the data source option `ignoreCorruptFiles` to ignore corrupt files while 
reading data
 from files. When set to true, the Spark jobs will continue to run when 
encountering corrupted files and
 the contents that have been read will still be returned.
 
@@ -64,7 +64,7 @@ To ignore corrupt files while reading data files, you can use:
 
 ### Ignore Missing Files
 
-Spark allows you to use `spark.sql.files.ignoreMissingFiles` to ignore missing 
files while reading data
+Spark allows you to use the configuration `spark.sql.files.ignoreMissingFiles` 
or the data source option `ignoreMissingFiles` to ignore missing files while 
reading data
 from files. Here, missing file really means the deleted file under directory 
after you construct the
 `DataFrame`. When set to true, the Spark jobs will continue to run when 
encountering missing files and
 the contents that have been read will still be returned.
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 5dcf321a4c8..c0960540b49 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -115,13 +115,26 @@ public class JavaSQLDataSourceExample {
 
   private static void runGenericFileSourceOptionsExample(SparkSession spark) {
     // $example on:ignore_corrupt_files$
-    // enable ignore corrupt files
+    // enable ignore corrupt files via the data source option
+    // dir1/file3.json is corrupt from parquet's view
+    Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", 
"true").parquet(
+        "examples/src/main/resources/dir1/",
+        "examples/src/main/resources/dir1/dir2/");
+    testCorruptDF0.show();
+    // +-------------+
+    // |         file|
+    // +-------------+
+    // |file1.parquet|
+    // |file2.parquet|
+    // +-------------+
+
+    // enable ignore corrupt files via the configuration
     spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
     // dir1/file3.json is corrupt from parquet's view
-    Dataset<Row> testCorruptDF = spark.read().parquet(
+    Dataset<Row> testCorruptDF1 = spark.read().parquet(
             "examples/src/main/resources/dir1/",
             "examples/src/main/resources/dir1/dir2/");
-    testCorruptDF.show();
+    testCorruptDF1.show();
     // +-------------+
     // |         file|
     // +-------------+
diff --git a/examples/src/main/python/sql/datasource.py 
b/examples/src/main/python/sql/datasource.py
index fd312dbf164..c7522cb9d34 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -28,12 +28,25 @@ from pyspark.sql import Row
 
 def generic_file_source_options_example(spark: SparkSession) -> None:
     # $example on:ignore_corrupt_files$
-    # enable ignore corrupt files
+    # enable ignore corrupt files via the data source option
+    # dir1/file3.json is corrupt from parquet's view
+    test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
+        .parquet("examples/src/main/resources/dir1/",
+                 "examples/src/main/resources/dir1/dir2/")
+    test_corrupt_df0.show()
+    # +-------------+
+    # |         file|
+    # +-------------+
+    # |file1.parquet|
+    # |file2.parquet|
+    # +-------------+
+
+    # enable ignore corrupt files via the configuration
     spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
     # dir1/file3.json is corrupt from parquet's view
-    test_corrupt_df = spark.read.parquet("examples/src/main/resources/dir1/",
-                                         
"examples/src/main/resources/dir1/dir2/")
-    test_corrupt_df.show()
+    test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
+                                          
"examples/src/main/resources/dir1/dir2/")
+    test_corrupt_df1.show()
     # +-------------+
     # |         file|
     # +-------------+
diff --git a/examples/src/main/r/RSparkSQLExample.R 
b/examples/src/main/r/RSparkSQLExample.R
index 15118e118ab..a7d3ae766c5 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -101,11 +101,19 @@ df <- sql("SELECT * FROM table")
 
 # Ignore corrupt files
 # $example on:ignore_corrupt_files$
-# enable ignore corrupt files
+# enable ignore corrupt files via the data source option
+# dir1/file3.json is corrupt from parquet's view
+testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", 
"examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
+head(testCorruptDF0)
+#            file
+# 1 file1.parquet
+# 2 file2.parquet
+
+# enable ignore corrupt files via the configuration
 sql("set spark.sql.files.ignoreCorruptFiles=true")
 # dir1/file3.json is corrupt from parquet's view
-testCorruptDF <- read.parquet(c("examples/src/main/resources/dir1/", 
"examples/src/main/resources/dir1/dir2/"))
-head(testCorruptDF)
+testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", 
"examples/src/main/resources/dir1/dir2/"))
+head(testCorruptDF1)
 #            file
 # 1 file1.parquet
 # 2 file2.parquet
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 6bd2bd6d3bf..9b04994199d 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -45,13 +45,26 @@ object SQLDataSourceExample {
 
   private def runGenericFileSourceOptionsExample(spark: SparkSession): Unit = {
     // $example on:ignore_corrupt_files$
-    // enable ignore corrupt files
+    // enable ignore corrupt files via the data source option
+    // dir1/file3.json is corrupt from parquet's view
+    val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", 
"true").parquet(
+      "examples/src/main/resources/dir1/",
+      "examples/src/main/resources/dir1/dir2/")
+    testCorruptDF0.show()
+    // +-------------+
+    // |         file|
+    // +-------------+
+    // |file1.parquet|
+    // |file2.parquet|
+    // +-------------+
+
+    // enable ignore corrupt files via the configuration
     spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
     // dir1/file3.json is corrupt from parquet's view
-    val testCorruptDF = spark.read.parquet(
+    val testCorruptDF1 = spark.read.parquet(
       "examples/src/main/resources/dir1/",
       "examples/src/main/resources/dir1/dir2/")
-    testCorruptDF.show()
+    testCorruptDF1.show()
     // +-------------+
     // |         file|
     // +-------------+
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
new file mode 100644
index 00000000000..6b9826d652e
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, 
IGNORE_MISSING_FILES}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Common options for the file-based data source.
+ */
+class FileSourceOptions(
+    @transient private val parameters: CaseInsensitiveMap[String])
+  extends Serializable {
+
+  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+
+  val ignoreCorruptFiles: Boolean = 
parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean)
+    .getOrElse(SQLConf.get.ignoreCorruptFiles)
+
+  val ignoreMissingFiles: Boolean = 
parameters.get(IGNORE_MISSING_FILES).map(_.toBoolean)
+    .getOrElse(SQLConf.get.ignoreMissingFiles)
+}
+
+object FileSourceOptions {
+  val IGNORE_CORRUPT_FILES = "ignoreCorruptFiles"
+  val IGNORE_MISSING_FILES = "ignoreMissingFiles"
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 2a404b14bfd..9daa50ba5a4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -24,6 +24,7 @@ import java.util.Locale
 import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, 
UnescapedQuoteHandling}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -34,7 +35,7 @@ class CSVOptions(
     val columnPruning: Boolean,
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
-  extends Logging with Serializable {
+  extends FileSourceOptions(parameters) with Logging {
 
   def this(
     parameters: Map[String, String],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index e801912e192..5f90dbc49c9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory, 
JsonFactoryBuilder}
 import com.fasterxml.jackson.core.json.JsonReadFeature
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
@@ -38,7 +39,7 @@ private[sql] class JSONOptions(
     @transient val parameters: CaseInsensitiveMap[String],
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
-  extends Logging with Serializable  {
+  extends FileSourceOptions(parameters) with Logging  {
 
   def this(
     parameters: Map[String, String],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5067cd7fa3c..5cf8aa91ea5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -25,12 +25,12 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
-import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -620,7 +620,7 @@ case class FileSourceScanExec(
     }
 
     new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
-      requiredSchema, metadataColumns)
+      requiredSchema, metadataColumns, new 
FileSourceOptions(CaseInsensitiveMap(relation.options)))
   }
 
   /**
@@ -677,7 +677,7 @@ case class FileSourceScanExec(
       FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
-      requiredSchema, metadataColumns)
+      requiredSchema, metadataColumns, new 
FileSourceOptions(CaseInsensitiveMap(relation.options)))
   }
 
   // Filters unused DynamicPruningExpression expressions - one which has been 
replaced
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 20c393a5c0e..97776413509 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -27,8 +27,9 @@ import org.apache.spark.{Partition => RDDPartition, 
SparkUpgradeException, TaskC
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.FileFormat._
 import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
@@ -69,11 +70,12 @@ class FileScanRDD(
     readFunction: (PartitionedFile) => Iterator[InternalRow],
     @transient val filePartitions: Seq[FilePartition],
     val readDataSchema: StructType,
-    val metadataColumns: Seq[AttributeReference] = Seq.empty)
+    val metadataColumns: Seq[AttributeReference] = Seq.empty,
+    options: FileSourceOptions = new 
FileSourceOptions(CaseInsensitiveMap(Map.empty)))
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
-  private val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
-  private val ignoreMissingFiles = 
sparkSession.sessionState.conf.ignoreMissingFiles
+  private val ignoreCorruptFiles = options.ignoreCorruptFiles
+  private val ignoreMissingFiles = options.ignoreMissingFiles
 
   override def compute(split: RDDPartition, context: TaskContext): 
Iterator[InternalRow] = {
     val iterator = new Iterator[Object] with AutoCloseable {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
index 98f580f2d4a..babecfc1f38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -60,7 +62,8 @@ object SchemaMergeUtils extends Logging {
     val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
       sparkSession.sparkContext.defaultParallelism)
 
-    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+    val ignoreCorruptFiles =
+      new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreCorruptFiles
 
     // Issues a Spark job to read Parquet/ORC schema in parallel.
     val partiallyMergedSchemas =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2b060c90153..02bc97cbdd6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -33,9 +33,10 @@ import org.apache.orc.mapreduce._
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
@@ -133,7 +134,7 @@ class OrcFileFormat
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
     val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
     val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
-    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+    val ignoreCorruptFiles = new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles
 
     (file: PartitionedFile) => {
       val conf = broadcastedConf.value.value
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
index 9416996198a..ef1c2bb5b41 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 
 import org.apache.orc.OrcConf.COMPRESS
 
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
 
@@ -30,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
 class OrcOptions(
     @transient private val parameters: CaseInsensitiveMap[String],
     @transient private val sqlConf: SQLConf)
-  extends Serializable {
+  extends FileSourceOptions(parameters) {
 
   import OrcOptions._
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index a68ce1a8636..79abdfe4690 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -33,12 +33,11 @@ import org.apache.spark.{SPARK_VERSION_SHORT, 
SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, 
CaseInsensitiveMap, CharVarcharUtils}
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, 
Count, CountStar, Max, Min}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
SchemaMergeUtils}
@@ -143,7 +142,7 @@ object OrcUtils extends Logging {
 
   def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: 
Map[String, String])
       : Option[StructType] = {
-    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+    val ignoreCorruptFiles = new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles
     val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
     files.iterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
       case Some(schema) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index da0c163dd85..07ed55b0b8f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -22,6 +22,7 @@ import java.util.Locale
 import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
 
@@ -31,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
 class ParquetOptions(
     @transient private val parameters: CaseInsensitiveMap[String],
     @transient private val sqlConf: SQLConf)
-  extends Serializable {
+  extends FileSourceOptions(parameters) {
 
   import ParquetOptions._
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index ef132162750..f1a1d465d1b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.datasources.text
 
 import java.nio.charset.{Charset, StandardCharsets}
 
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CompressionCodecs}
 
 /**
  * Options for the Text data source.
  */
 class TextOptions(@transient private val parameters: 
CaseInsensitiveMap[String])
-  extends Serializable {
+  extends FileSourceOptions(parameters) {
 
   import TextOptions._
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 00efd48f951..782c1f50d80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -23,18 +23,19 @@ import scala.util.control.NonFatal
 import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.connector.read.PartitionReader
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
-import org.apache.spark.sql.internal.SQLConf
 
-class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
+class FilePartitionReader[T](
+    readers: Iterator[PartitionedFileReader[T]],
+    options: FileSourceOptions)
   extends PartitionReader[T] with Logging {
   private var currentReader: PartitionedFileReader[T] = null
 
-  private val sqlConf = SQLConf.get
-  private def ignoreMissingFiles = sqlConf.ignoreMissingFiles
-  private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+  private def ignoreMissingFiles = options.ignoreMissingFiles
+  private def ignoreCorruptFiles = options.ignoreCorruptFiles
 
   override def next(): Boolean = {
     if (currentReader == null) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
index da4f9e89fde..d7b88b505b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
@@ -16,20 +16,23 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
+
+  protected def options: FileSourceOptions
+
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
     assert(partition.isInstanceOf[FilePartition])
     val filePartition = partition.asInstanceOf[FilePartition]
     val iter = filePartition.files.iterator.map { file =>
       PartitionedFileReader(file, buildReader(file))
     }
-    new FilePartitionReader[InternalRow](iter)
+    new FilePartitionReader[InternalRow](iter, options)
   }
 
   override def createColumnarReader(partition: InputPartition): 
PartitionReader[ColumnarBatch] = {
@@ -38,7 +41,7 @@ abstract class FilePartitionReaderFactory extends 
PartitionReaderFactory {
     val iter = filePartition.files.iterator.map { file =>
       PartitionedFileReader(file, buildColumnarReader(file))
     }
-    new FilePartitionReader[ColumnarBatch](iter)
+    new FilePartitionReader[ColumnarBatch](iter, options)
   }
 
   def buildReader(partitionedFile: PartitionedFile): 
PartitionReader[InternalRow]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index bf996ab1b31..f8a17c8eaa8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration
  * @param dataSchema Schema of CSV files.
  * @param readDataSchema Required data schema in the batch scan.
  * @param partitionSchema Schema of partitions.
- * @param parsedOptions Options for parsing CSV files.
+ * @param options Options for parsing CSV files.
  */
 case class CSVPartitionReaderFactory(
     sqlConf: SQLConf,
@@ -44,25 +44,25 @@ case class CSVPartitionReaderFactory(
     dataSchema: StructType,
     readDataSchema: StructType,
     partitionSchema: StructType,
-    parsedOptions: CSVOptions,
+    options: CSVOptions,
     filters: Seq[Filter]) extends FilePartitionReaderFactory {
 
   override def buildReader(file: PartitionedFile): 
PartitionReader[InternalRow] = {
     val conf = broadcastedConf.value.value
     val actualDataSchema = StructType(
-      dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
+      dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
     val actualReadDataSchema = StructType(
-      readDataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
+      readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
     val parser = new UnivocityParser(
       actualDataSchema,
       actualReadDataSchema,
-      parsedOptions,
+      options,
       filters)
-    val schema = if (parsedOptions.columnPruning) actualReadDataSchema else 
actualDataSchema
+    val schema = if (options.columnPruning) actualReadDataSchema else 
actualDataSchema
     val isStartOfFile = file.start == 0
     val headerChecker = new CSVHeaderChecker(
-      schema, parsedOptions, source = s"CSV file: ${file.filePath}", 
isStartOfFile)
-    val iter = CSVDataSource(parsedOptions).readFile(
+      schema, options, source = s"CSV file: ${file.filePath}", isStartOfFile)
+    val iter = CSVDataSource(options).readFile(
       conf,
       file,
       parser,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
index 9737803b597..d9cd41dd560 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration
  * @param dataSchema Schema of JSON files.
  * @param readDataSchema Required schema of JSON files.
  * @param partitionSchema Schema of partitions.
- * @param parsedOptions Options for parsing JSON files.
+ * @param options Options for parsing JSON files.
  * @param filters The filters pushed down to JSON datasource.
  */
 case class JsonPartitionReaderFactory(
@@ -45,18 +45,18 @@ case class JsonPartitionReaderFactory(
     dataSchema: StructType,
     readDataSchema: StructType,
     partitionSchema: StructType,
-    parsedOptions: JSONOptionsInRead,
+    options: JSONOptionsInRead,
     filters: Seq[Filter]) extends FilePartitionReaderFactory {
 
   override def buildReader(partitionedFile: PartitionedFile): 
PartitionReader[InternalRow] = {
     val actualSchema =
-      StructType(readDataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
+      StructType(readDataSchema.filterNot(_.name == 
options.columnNameOfCorruptRecord))
     val parser = new JacksonParser(
       actualSchema,
-      parsedOptions,
+      options,
       allowArrayAsStructs = true,
       filters)
-    val iter = JsonDataSource(parsedOptions).readFile(
+    val iter = JsonDataSource(options).readFile(
       broadcastedConf.value.value,
       partitionedFile,
       parser,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index ef13beaf9b4..59e3214f047 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -33,7 +33,7 @@ import 
org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitionedFile}
-import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, 
OrcDeserializer, OrcFilters, OrcUtils}
+import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, 
OrcDeserializer, OrcFilters, OrcOptions, OrcUtils}
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
@@ -49,6 +49,7 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  * @param dataSchema Schema of orc files.
  * @param readDataSchema Required data schema in the batch scan.
  * @param partitionSchema Schema of partitions.
+ * @param options Options for parsing ORC files.
  */
 case class OrcPartitionReaderFactory(
     sqlConf: SQLConf,
@@ -57,12 +58,13 @@ case class OrcPartitionReaderFactory(
     readDataSchema: StructType,
     partitionSchema: StructType,
     filters: Array[Filter],
-    aggregation: Option[Aggregation]) extends FilePartitionReaderFactory {
+    aggregation: Option[Aggregation],
+    options: OrcOptions) extends FilePartitionReaderFactory {
   private val resultSchema = StructType(readDataSchema.fields ++ 
partitionSchema.fields)
   private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
   private val capacity = sqlConf.orcVectorizedReaderBatchSize
   private val orcFilterPushDown = sqlConf.orcFilterPushDown
-  private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+  private val ignoreCorruptFiles = options.ignoreCorruptFiles
 
   override def supportColumnarReads(partition: InputPartition): Boolean = {
     sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index baf307257c3..ad8857d9803 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.orc
 
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
@@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex}
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
@@ -64,7 +67,8 @@ case class OrcScan(
     // The partition values are already truncated in `FileScan.partitions`.
     // We should use `readPartitionSchema` as the partition schema here.
     OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
-      dataSchema, readDataSchema, readPartitionSchema, pushedFilters, 
pushedAggregate)
+      dataSchema, readDataSchema, readPartitionSchema, pushedFilters, 
pushedAggregate,
+      new OrcOptions(options.asScala.toMap, sparkSession.sessionState.conf))
   }
 
   override def equals(obj: Any): Boolean = obj match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index ea4f5e0d287..9a25dd88ff4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -56,7 +56,7 @@ import org.apache.spark.util.SerializableConfiguration
  * @param partitionSchema Schema of partitions.
  * @param filters Filters to be pushed down in the batch scan.
  * @param aggregation Aggregation to be pushed down in the batch scan.
- * @param parquetOptions The options of Parquet datasource that are set for 
the read.
+ * @param options The options of Parquet datasource that are set for the read.
  */
 case class ParquetPartitionReaderFactory(
     sqlConf: SQLConf,
@@ -66,7 +66,7 @@ case class ParquetPartitionReaderFactory(
     partitionSchema: StructType,
     filters: Array[Filter],
     aggregation: Option[Aggregation],
-    parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with 
Logging {
+    options: ParquetOptions) extends FilePartitionReaderFactory with Logging {
   private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
   private val resultSchema = StructType(partitionSchema.fields ++ 
readDataSchema.fields)
   private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
@@ -81,8 +81,8 @@ case class ParquetPartitionReaderFactory(
   private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
   private val pushDownStringStartWith = 
sqlConf.parquetFilterPushDownStringStartWith
   private val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
-  private val datetimeRebaseModeInRead = 
parquetOptions.datetimeRebaseModeInRead
-  private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+  private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
+  private val int96RebaseModeInRead = options.int96RebaseModeInRead
 
   private def getFooter(file: PartitionedFile): ParquetMetadata = {
     val conf = broadcastedConf.value.value
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
index 0cd184da6ef..6542c1c2c3e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
@@ -36,19 +36,19 @@ import org.apache.spark.util.SerializableConfiguration
  * @param broadcastedConf Broadcasted serializable Hadoop Configuration.
  * @param readDataSchema Required schema in the batch scan.
  * @param partitionSchema Schema of partitions.
- * @param textOptions Options for reading a text file.
+ * @param options Options for reading a text file.
  * */
 case class TextPartitionReaderFactory(
     sqlConf: SQLConf,
     broadcastedConf: Broadcast[SerializableConfiguration],
     readDataSchema: StructType,
     partitionSchema: StructType,
-    textOptions: TextOptions) extends FilePartitionReaderFactory {
+    options: TextOptions) extends FilePartitionReaderFactory {
 
   override def buildReader(file: PartitionedFile): 
PartitionReader[InternalRow] = {
     val confValue = broadcastedConf.value.value
-    val reader = if (!textOptions.wholeText) {
-      new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, 
confValue)
+    val reader = if (!options.wholeText) {
+      new HadoopFileLinesReader(file, options.lineSeparatorInRead, confValue)
     } else {
       new HadoopFileWholeTextReader(file, confValue)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 9e7eb1d0ad5..5011a7713a5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -181,7 +181,7 @@ class FileBasedDataSourceSuite extends QueryTest
 
   allFileBasedDataSources.foreach { format =>
     testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
-      def testIgnoreMissingFiles(): Unit = {
+      def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
         withTempDir { dir =>
           val basePath = dir.getCanonicalPath
 
@@ -197,7 +197,7 @@ class FileBasedDataSourceSuite extends QueryTest
             fs.listStatus(p).filter(_.isFile).map(_.getPath)
           }
 
-          val df = spark.read.format(format).load(
+          val df = spark.read.options(options).format(format).load(
             new Path(basePath, "first").toString,
             new Path(basePath, "second").toString,
             new Path(basePath, "third").toString,
@@ -214,20 +214,27 @@ class FileBasedDataSourceSuite extends QueryTest
         }
       }
 
+      // Test set ignoreMissingFiles via SQL Conf and Data Source reader 
options
       for {
-        ignore <- Seq("true", "false")
+        (ignore, options, sqlConf) <- Seq(
+          // Set via SQL Conf: leave options empty
+          ("true", Map.empty[String, String], "true"),
+          ("false", Map.empty[String, String], "false"),
+          // Set via reader options: explicitly set SQL Conf to opposite
+          ("true", Map("ignoreMissingFiles" -> "true"), "false"),
+          ("false", Map("ignoreMissingFiles" -> "false"), "true"))
         sources <- Seq("", format)
       } {
-        withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> ignore,
-          SQLConf.USE_V1_SOURCE_LIST.key -> sources) {
-            if (ignore.toBoolean) {
-              testIgnoreMissingFiles()
-            } else {
-              val exception = intercept[SparkException] {
-                testIgnoreMissingFiles()
-              }
-              assert(exception.getMessage().contains("does not exist"))
+        withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> sources,
+          SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
+          if (ignore.toBoolean) {
+            testIgnoreMissingFiles(options)
+          } else {
+            val exception = intercept[SparkException] {
+              testIgnoreMissingFiles(options)
             }
+            assert(exception.getMessage().contains("does not exist"))
+          }
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 64944054326..426d477d38f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -306,13 +306,13 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
   }
 
   test("Enabling/disabling ignoreCorruptFiles") {
-    def testIgnoreCorruptFiles(): Unit = {
+    def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
       withTempDir { dir =>
         val basePath = dir.getCanonicalPath
         spark.range(1).toDF("a").write.parquet(new Path(basePath, 
"first").toString)
         spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, 
"second").toString)
         spark.range(2, 3).toDF("a").write.json(new Path(basePath, 
"third").toString)
-        val df = spark.read.parquet(
+        val df = spark.read.options(options).parquet(
           new Path(basePath, "first").toString,
           new Path(basePath, "second").toString,
           new Path(basePath, "third").toString)
@@ -320,13 +320,13 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
       }
     }
 
-    def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
+    def testIgnoreCorruptFilesWithoutSchemaInfer(options: Map[String, 
String]): Unit = {
       withTempDir { dir =>
         val basePath = dir.getCanonicalPath
         spark.range(1).toDF("a").write.parquet(new Path(basePath, 
"first").toString)
         spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, 
"second").toString)
         spark.range(2, 3).toDF("a").write.json(new Path(basePath, 
"third").toString)
-        val df = spark.read.schema("a long").parquet(
+        val df = spark.read.options(options).schema("a long").parquet(
           new Path(basePath, "first").toString,
           new Path(basePath, "second").toString,
           new Path(basePath, "third").toString)
@@ -334,20 +334,39 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
       }
     }
 
-    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
-      testIgnoreCorruptFiles()
-      testIgnoreCorruptFilesWithoutSchemaInfer()
+    // Test ignoreCorruptFiles = true
+    Seq("SQLConf", "FormatOption").foreach { by =>
+      val (sqlConf, options) = by match {
+        case "SQLConf" => ("true", Map.empty[String, String])
+        // Explicitly set SQLConf to false but still should ignore corrupt 
files
+        case "FormatOption" => ("false", Map("ignoreCorruptFiles" -> "true"))
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> sqlConf) {
+        testIgnoreCorruptFiles(options)
+        testIgnoreCorruptFilesWithoutSchemaInfer(options)
+      }
     }
 
-    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
-      val exception = intercept[SparkException] {
-        testIgnoreCorruptFiles()
+    // Test ignoreCorruptFiles = false
+    Seq("SQLConf", "FormatOption").foreach { by =>
+      val (sqlConf, options) = by match {
+        case "SQLConf" => ("false", Map.empty[String, String])
+        // Explicitly set SQLConf to true but still should not ignore corrupt 
files
+        case "FormatOption" => ("true", Map("ignoreCorruptFiles" -> "false"))
       }
-      assert(exception.getMessage().contains("is not a Parquet file"))
-      val exception2 = intercept[SparkException] {
-        testIgnoreCorruptFilesWithoutSchemaInfer()
+
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> sqlConf) {
+        withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+          val exception = intercept[SparkException] {
+            testIgnoreCorruptFiles(options)
+          }
+          assert(exception.getMessage().contains("is not a Parquet file"))
+          val exception2 = intercept[SparkException] {
+            testIgnoreCorruptFilesWithoutSchemaInfer(options)
+          }
+          assert(exception2.getMessage().contains("is not a Parquet file"))
+        }
       }
-      assert(exception2.getMessage().contains("is not a Parquet file"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to