This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a92ef00145b [SPARK-38767][SQL] Support `ignoreCorruptFiles` and `ignoreMissingFiles` in Data Source options a92ef00145b is described below commit a92ef00145b264013e11de12f2c7cee62c28198d Author: yaohua <yaohua.z...@databricks.com> AuthorDate: Tue Apr 12 12:50:13 2022 -0700 [SPARK-38767][SQL] Support `ignoreCorruptFiles` and `ignoreMissingFiles` in Data Source options ### What changes were proposed in this pull request? Support `ignoreCorruptFiles` and `ignoreMissingFiles` in Data Source options for both V1 and V2 file-based data sources. ``` spark.read...option("ignoreCorruptFiles", "true")... spark.read...option("ignoreMissingFiles", "true")... ``` ### Why are the changes needed? Improve UX ### Does this PR introduce _any_ user-facing change? Yes. Previously: ``` spark.sql("set spark.sql.files.ignoreCorruptFiles=true") spark.sql("set spark.sql.files.ignoreMissingFiles=true") spark.read... spark.read... ``` Now: ``` spark.read...option("ignoreCorruptFiles", "true")... spark.read...option("ignoreMissingFiles", "true")... ``` ### How was this patch tested? Enhance existing UTs for ignoreMissingFiles + ignoreCorruptFiles Closes #36069 from Yaohua628/spark-38767. Authored-by: yaohua <yaohua.z...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/sql/avro/AvroOptions.scala | 4 +- .../org/apache/spark/sql/avro/AvroUtils.scala | 5 ++- .../sql/v2/avro/AvroPartitionReaderFactory.scala | 12 +++--- docs/sql-data-sources-generic-options.md | 4 +- .../examples/sql/JavaSQLDataSourceExample.java | 19 +++++++-- examples/src/main/python/sql/datasource.py | 21 ++++++++-- examples/src/main/r/RSparkSQLExample.R | 14 +++++-- .../spark/examples/sql/SQLDataSourceExample.scala | 19 +++++++-- .../spark/sql/catalyst/FileSourceOptions.scala | 42 +++++++++++++++++++ .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 3 +- .../spark/sql/catalyst/json/JSONOptions.scala | 3 +- .../spark/sql/execution/DataSourceScanExec.scala | 8 ++-- .../sql/execution/datasources/FileScanRDD.scala | 10 +++-- .../execution/datasources/SchemaMergeUtils.scala | 5 ++- .../execution/datasources/orc/OrcFileFormat.scala | 5 ++- .../sql/execution/datasources/orc/OrcOptions.scala | 3 +- .../sql/execution/datasources/orc/OrcUtils.scala | 7 ++-- .../datasources/parquet/ParquetOptions.scala | 3 +- .../execution/datasources/text/TextOptions.scala | 3 +- .../datasources/v2/FilePartitionReader.scala | 11 ++--- .../v2/FilePartitionReaderFactory.scala | 9 +++-- .../v2/csv/CSVPartitionReaderFactory.scala | 16 ++++---- .../v2/json/JsonPartitionReaderFactory.scala | 10 ++--- .../v2/orc/OrcPartitionReaderFactory.scala | 8 ++-- .../sql/execution/datasources/v2/orc/OrcScan.scala | 6 ++- .../v2/parquet/ParquetPartitionReaderFactory.scala | 8 ++-- .../v2/text/TextPartitionReaderFactory.scala | 8 ++-- .../spark/sql/FileBasedDataSourceSuite.scala | 31 ++++++++------ .../datasources/parquet/ParquetQuerySuite.scala | 47 +++++++++++++++------- 29 files changed, 241 insertions(+), 103 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 48b2c3481a6..7f6a274753c 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode} import org.apache.spark.sql.internal.SQLConf @@ -33,7 +34,8 @@ import org.apache.spark.sql.internal.SQLConf */ private[sql] class AvroOptions( @transient val parameters: CaseInsensitiveMap[String], - @transient val conf: Configuration) extends Logging with Serializable { + @transient val conf: Configuration) + extends FileSourceOptions(parameters) with Logging { def this(parameters: Map[String, String], conf: Configuration) = { this(CaseInsensitiveMap(parameters), conf) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index ef9d22f35d0..d03902faab9 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -35,7 +35,8 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -57,7 +58,7 @@ private[sql] object AvroUtils extends Logging { val avroSchema = parsedOptions.schema .getOrElse { inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension, - spark.sessionState.conf.ignoreCorruptFiles) + new FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles) } SchemaConverters.toSqlType(avroSchema).dataType match { diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index a4dfdbfe68f..3ad63f113fe 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.SerializableConfiguration * @param dataSchema Schema of AVRO files. * @param readDataSchema Required data schema of AVRO files. * @param partitionSchema Schema of partitions. - * @param parsedOptions Options for parsing AVRO files. + * @param options Options for parsing AVRO files. */ case class AvroPartitionReaderFactory( sqlConf: SQLConf, @@ -54,15 +54,15 @@ case class AvroPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - parsedOptions: AvroOptions, + options: AvroOptions, filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging { - private val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead + private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val userProvidedSchema = parsedOptions.schema + val userProvidedSchema = options.schema - if (parsedOptions.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) { + if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) { val reader = { val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf) try { @@ -104,7 +104,7 @@ case class AvroPartitionReaderFactory( override val deserializer = new AvroDeserializer( userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, - parsedOptions.positionalFieldMatching, + options.positionalFieldMatching, datetimeRebaseMode, avroFilters) override val stopPosition = partitionedFile.start + partitionedFile.length diff --git a/docs/sql-data-sources-generic-options.md b/docs/sql-data-sources-generic-options.md index 2e4fc879a43..7835371ec43 100644 --- a/docs/sql-data-sources-generic-options.md +++ b/docs/sql-data-sources-generic-options.md @@ -38,7 +38,7 @@ dir1/ ### Ignore Corrupt Files -Spark allows you to use `spark.sql.files.ignoreCorruptFiles` to ignore corrupt files while reading data +Spark allows you to use the configuration `spark.sql.files.ignoreCorruptFiles` or the data source option `ignoreCorruptFiles` to ignore corrupt files while reading data from files. When set to true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned. @@ -64,7 +64,7 @@ To ignore corrupt files while reading data files, you can use: ### Ignore Missing Files -Spark allows you to use `spark.sql.files.ignoreMissingFiles` to ignore missing files while reading data +Spark allows you to use the configuration `spark.sql.files.ignoreMissingFiles` or the data source option `ignoreMissingFiles` to ignore missing files while reading data from files. Here, missing file really means the deleted file under directory after you construct the `DataFrame`. When set to true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned. diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 5dcf321a4c8..c0960540b49 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -115,13 +115,26 @@ public class JavaSQLDataSourceExample { private static void runGenericFileSourceOptionsExample(SparkSession spark) { // $example on:ignore_corrupt_files$ - // enable ignore corrupt files + // enable ignore corrupt files via the data source option + // dir1/file3.json is corrupt from parquet's view + Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", "true").parquet( + "examples/src/main/resources/dir1/", + "examples/src/main/resources/dir1/dir2/"); + testCorruptDF0.show(); + // +-------------+ + // | file| + // +-------------+ + // |file1.parquet| + // |file2.parquet| + // +-------------+ + + // enable ignore corrupt files via the configuration spark.sql("set spark.sql.files.ignoreCorruptFiles=true"); // dir1/file3.json is corrupt from parquet's view - Dataset<Row> testCorruptDF = spark.read().parquet( + Dataset<Row> testCorruptDF1 = spark.read().parquet( "examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"); - testCorruptDF.show(); + testCorruptDF1.show(); // +-------------+ // | file| // +-------------+ diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index fd312dbf164..c7522cb9d34 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -28,12 +28,25 @@ from pyspark.sql import Row def generic_file_source_options_example(spark: SparkSession) -> None: # $example on:ignore_corrupt_files$ - # enable ignore corrupt files + # enable ignore corrupt files via the data source option + # dir1/file3.json is corrupt from parquet's view + test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\ + .parquet("examples/src/main/resources/dir1/", + "examples/src/main/resources/dir1/dir2/") + test_corrupt_df0.show() + # +-------------+ + # | file| + # +-------------+ + # |file1.parquet| + # |file2.parquet| + # +-------------+ + + # enable ignore corrupt files via the configuration spark.sql("set spark.sql.files.ignoreCorruptFiles=true") # dir1/file3.json is corrupt from parquet's view - test_corrupt_df = spark.read.parquet("examples/src/main/resources/dir1/", - "examples/src/main/resources/dir1/dir2/") - test_corrupt_df.show() + test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/", + "examples/src/main/resources/dir1/dir2/") + test_corrupt_df1.show() # +-------------+ # | file| # +-------------+ diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 15118e118ab..a7d3ae766c5 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -101,11 +101,19 @@ df <- sql("SELECT * FROM table") # Ignore corrupt files # $example on:ignore_corrupt_files$ -# enable ignore corrupt files +# enable ignore corrupt files via the data source option +# dir1/file3.json is corrupt from parquet's view +testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true") +head(testCorruptDF0) +# file +# 1 file1.parquet +# 2 file2.parquet + +# enable ignore corrupt files via the configuration sql("set spark.sql.files.ignoreCorruptFiles=true") # dir1/file3.json is corrupt from parquet's view -testCorruptDF <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/")) -head(testCorruptDF) +testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/")) +head(testCorruptDF1) # file # 1 file1.parquet # 2 file2.parquet diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 6bd2bd6d3bf..9b04994199d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -45,13 +45,26 @@ object SQLDataSourceExample { private def runGenericFileSourceOptionsExample(spark: SparkSession): Unit = { // $example on:ignore_corrupt_files$ - // enable ignore corrupt files + // enable ignore corrupt files via the data source option + // dir1/file3.json is corrupt from parquet's view + val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", "true").parquet( + "examples/src/main/resources/dir1/", + "examples/src/main/resources/dir1/dir2/") + testCorruptDF0.show() + // +-------------+ + // | file| + // +-------------+ + // |file1.parquet| + // |file2.parquet| + // +-------------+ + + // enable ignore corrupt files via the configuration spark.sql("set spark.sql.files.ignoreCorruptFiles=true") // dir1/file3.json is corrupt from parquet's view - val testCorruptDF = spark.read.parquet( + val testCorruptDF1 = spark.read.parquet( "examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/") - testCorruptDF.show() + testCorruptDF1.show() // +-------------+ // | file| // +-------------+ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala new file mode 100644 index 00000000000..6b9826d652e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst + +import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.internal.SQLConf + +/** + * Common options for the file-based data source. + */ +class FileSourceOptions( + @transient private val parameters: CaseInsensitiveMap[String]) + extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + val ignoreCorruptFiles: Boolean = parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean) + .getOrElse(SQLConf.get.ignoreCorruptFiles) + + val ignoreMissingFiles: Boolean = parameters.get(IGNORE_MISSING_FILES).map(_.toBoolean) + .getOrElse(SQLConf.get.ignoreMissingFiles) +} + +object FileSourceOptions { + val IGNORE_CORRUPT_FILES = "ignoreCorruptFiles" + val IGNORE_MISSING_FILES = "ignoreMissingFiles" +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 2a404b14bfd..9daa50ba5a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -24,6 +24,7 @@ import java.util.Locale import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf @@ -34,7 +35,7 @@ class CSVOptions( val columnPruning: Boolean, defaultTimeZoneId: String, defaultColumnNameOfCorruptRecord: String) - extends Logging with Serializable { + extends FileSourceOptions(parameters) with Logging { def this( parameters: Map[String, String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index e801912e192..5f90dbc49c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonFactoryBuilder} import com.fasterxml.jackson.core.json.JsonReadFeature import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy @@ -38,7 +39,7 @@ private[sql] class JSONOptions( @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, defaultColumnNameOfCorruptRecord: String) - extends Logging with Serializable { + extends FileSourceOptions(parameters) with Logging { def this( parameters: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5067cd7fa3c..5cf8aa91ea5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -25,12 +25,12 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} -import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators @@ -620,7 +620,7 @@ case class FileSourceScanExec( } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, - requiredSchema, metadataColumns) + requiredSchema, metadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options))) } /** @@ -677,7 +677,7 @@ case class FileSourceScanExec( FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) new FileScanRDD(fsRelation.sparkSession, readFile, partitions, - requiredSchema, metadataColumns) + requiredSchema, metadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options))) } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 20c393a5c0e..97776413509 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -27,8 +27,9 @@ import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskC import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.FileFormat._ import org.apache.spark.sql.execution.vectorized.ConstantColumnVector @@ -69,11 +70,12 @@ class FileScanRDD( readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition], val readDataSchema: StructType, - val metadataColumns: Seq[AttributeReference] = Seq.empty) + val metadataColumns: Seq[AttributeReference] = Seq.empty, + options: FileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(Map.empty))) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { - private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles - private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + private val ignoreCorruptFiles = options.ignoreCorruptFiles + private val ignoreMissingFiles = options.ignoreMissingFiles override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index 98f580f2d4a..babecfc1f38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -60,7 +62,8 @@ object SchemaMergeUtils extends Logging { val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), sparkSession.sparkContext.defaultParallelism) - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + val ignoreCorruptFiles = + new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreCorruptFiles // Issues a Spark job to read Parquet/ORC schema in parallel. val partiallyMergedSchemas = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 2b060c90153..02bc97cbdd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -33,9 +33,10 @@ import org.apache.orc.mapreduce._ import org.apache.spark.TaskContext import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ @@ -133,7 +134,7 @@ class OrcFileFormat sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + val ignoreCorruptFiles = new FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles (file: PartitionedFile) => { val conf = broadcastedConf.value.value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index 9416996198a..ef1c2bb5b41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -21,6 +21,7 @@ import java.util.Locale import org.apache.orc.OrcConf.COMPRESS +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf @@ -30,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf class OrcOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) - extends Serializable { + extends FileSourceOptions(parameters) { import OrcOptions._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index a68ce1a8636..79abdfe4690 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -33,12 +33,11 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, SchemaMergeUtils} @@ -143,7 +142,7 @@ object OrcUtils extends Logging { def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) : Option[StructType] = { - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + val ignoreCorruptFiles = new FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConfWithOptions(options) files.iterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { case Some(schema) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index da0c163dd85..07ed55b0b8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -22,6 +22,7 @@ import java.util.Locale import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf @@ -31,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) - extends Serializable { + extends FileSourceOptions(parameters) { import ParquetOptions._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index ef132162750..f1a1d465d1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.datasources.text import java.nio.charset.{Charset, StandardCharsets} +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} /** * Options for the Text data source. */ class TextOptions(@transient private val parameters: CaseInsensitiveMap[String]) - extends Serializable { + extends FileSourceOptions(parameters) { import TextOptions._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index 00efd48f951..782c1f50d80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -23,18 +23,19 @@ import scala.util.control.NonFatal import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException -import org.apache.spark.sql.internal.SQLConf -class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]]) +class FilePartitionReader[T]( + readers: Iterator[PartitionedFileReader[T]], + options: FileSourceOptions) extends PartitionReader[T] with Logging { private var currentReader: PartitionedFileReader[T] = null - private val sqlConf = SQLConf.get - private def ignoreMissingFiles = sqlConf.ignoreMissingFiles - private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles + private def ignoreMissingFiles = options.ignoreMissingFiles + private def ignoreCorruptFiles = options.ignoreCorruptFiles override def next(): Boolean = { if (currentReader == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala index da4f9e89fde..d7b88b505b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala @@ -16,20 +16,23 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.vectorized.ColumnarBatch abstract class FilePartitionReaderFactory extends PartitionReaderFactory { + + protected def options: FileSourceOptions + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { assert(partition.isInstanceOf[FilePartition]) val filePartition = partition.asInstanceOf[FilePartition] val iter = filePartition.files.iterator.map { file => PartitionedFileReader(file, buildReader(file)) } - new FilePartitionReader[InternalRow](iter) + new FilePartitionReader[InternalRow](iter, options) } override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { @@ -38,7 +41,7 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory { val iter = filePartition.files.iterator.map { file => PartitionedFileReader(file, buildColumnarReader(file)) } - new FilePartitionReader[ColumnarBatch](iter) + new FilePartitionReader[ColumnarBatch](iter, options) } def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index bf996ab1b31..f8a17c8eaa8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration * @param dataSchema Schema of CSV files. * @param readDataSchema Required data schema in the batch scan. * @param partitionSchema Schema of partitions. - * @param parsedOptions Options for parsing CSV files. + * @param options Options for parsing CSV files. */ case class CSVPartitionReaderFactory( sqlConf: SQLConf, @@ -44,25 +44,25 @@ case class CSVPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - parsedOptions: CSVOptions, + options: CSVOptions, filters: Seq[Filter]) extends FilePartitionReaderFactory { override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value val actualDataSchema = StructType( - dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) val actualReadDataSchema = StructType( - readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) val parser = new UnivocityParser( actualDataSchema, actualReadDataSchema, - parsedOptions, + options, filters) - val schema = if (parsedOptions.columnPruning) actualReadDataSchema else actualDataSchema + val schema = if (options.columnPruning) actualReadDataSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( - schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile) - val iter = CSVDataSource(parsedOptions).readFile( + schema, options, source = s"CSV file: ${file.filePath}", isStartOfFile) + val iter = CSVDataSource(options).readFile( conf, file, parser, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala index 9737803b597..d9cd41dd560 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala @@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration * @param dataSchema Schema of JSON files. * @param readDataSchema Required schema of JSON files. * @param partitionSchema Schema of partitions. - * @param parsedOptions Options for parsing JSON files. + * @param options Options for parsing JSON files. * @param filters The filters pushed down to JSON datasource. */ case class JsonPartitionReaderFactory( @@ -45,18 +45,18 @@ case class JsonPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - parsedOptions: JSONOptionsInRead, + options: JSONOptionsInRead, filters: Seq[Filter]) extends FilePartitionReaderFactory { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val actualSchema = - StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + StructType(readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord)) val parser = new JacksonParser( actualSchema, - parsedOptions, + options, allowArrayAsStructs = true, filters) - val iter = JsonDataSource(parsedOptions).readFile( + val iter = JsonDataSource(options).readFile( broadcastedConf.value.value, partitionedFile, parser, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index ef13beaf9b4..59e3214f047 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitionedFile} -import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcOptions, OrcUtils} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -49,6 +49,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} * @param dataSchema Schema of orc files. * @param readDataSchema Required data schema in the batch scan. * @param partitionSchema Schema of partitions. + * @param options Options for parsing ORC files. */ case class OrcPartitionReaderFactory( sqlConf: SQLConf, @@ -57,12 +58,13 @@ case class OrcPartitionReaderFactory( readDataSchema: StructType, partitionSchema: StructType, filters: Array[Filter], - aggregation: Option[Aggregation]) extends FilePartitionReaderFactory { + aggregation: Option[Aggregation], + options: OrcOptions) extends FilePartitionReaderFactory { private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields) private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val capacity = sqlConf.orcVectorizedReaderBatchSize private val orcFilterPushDown = sqlConf.orcFilterPushDown - private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles + private val ignoreCorruptFiles = options.ignoreCorruptFiles override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index baf307257c3..ad8857d9803 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.orc +import scala.collection.JavaConverters.mapAsScalaMapConverter + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.orc.OrcOptions import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -64,7 +67,8 @@ case class OrcScan( // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, pushedFilters, pushedAggregate) + dataSchema, readDataSchema, readPartitionSchema, pushedFilters, pushedAggregate, + new OrcOptions(options.asScala.toMap, sparkSession.sessionState.conf)) } override def equals(obj: Any): Boolean = obj match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index ea4f5e0d287..9a25dd88ff4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.SerializableConfiguration * @param partitionSchema Schema of partitions. * @param filters Filters to be pushed down in the batch scan. * @param aggregation Aggregation to be pushed down in the batch scan. - * @param parquetOptions The options of Parquet datasource that are set for the read. + * @param options The options of Parquet datasource that are set for the read. */ case class ParquetPartitionReaderFactory( sqlConf: SQLConf, @@ -66,7 +66,7 @@ case class ParquetPartitionReaderFactory( partitionSchema: StructType, filters: Array[Filter], aggregation: Option[Aggregation], - parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with Logging { + options: ParquetOptions) extends FilePartitionReaderFactory with Logging { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled @@ -81,8 +81,8 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead + private val int96RebaseModeInRead = options.int96RebaseModeInRead private def getFooter(file: PartitionedFile): ParquetMetadata = { val conf = broadcastedConf.value.value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala index 0cd184da6ef..6542c1c2c3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala @@ -36,19 +36,19 @@ import org.apache.spark.util.SerializableConfiguration * @param broadcastedConf Broadcasted serializable Hadoop Configuration. * @param readDataSchema Required schema in the batch scan. * @param partitionSchema Schema of partitions. - * @param textOptions Options for reading a text file. + * @param options Options for reading a text file. * */ case class TextPartitionReaderFactory( sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], readDataSchema: StructType, partitionSchema: StructType, - textOptions: TextOptions) extends FilePartitionReaderFactory { + options: TextOptions) extends FilePartitionReaderFactory { override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val confValue = broadcastedConf.value.value - val reader = if (!textOptions.wholeText) { - new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, confValue) + val reader = if (!options.wholeText) { + new HadoopFileLinesReader(file, options.lineSeparatorInRead, confValue) } else { new HadoopFileWholeTextReader(file, confValue) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 9e7eb1d0ad5..5011a7713a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -181,7 +181,7 @@ class FileBasedDataSourceSuite extends QueryTest allFileBasedDataSources.foreach { format => testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") { - def testIgnoreMissingFiles(): Unit = { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath @@ -197,7 +197,7 @@ class FileBasedDataSourceSuite extends QueryTest fs.listStatus(p).filter(_.isFile).map(_.getPath) } - val df = spark.read.format(format).load( + val df = spark.read.options(options).format(format).load( new Path(basePath, "first").toString, new Path(basePath, "second").toString, new Path(basePath, "third").toString, @@ -214,20 +214,27 @@ class FileBasedDataSourceSuite extends QueryTest } } + // Test set ignoreMissingFiles via SQL Conf and Data Source reader options for { - ignore <- Seq("true", "false") + (ignore, options, sqlConf) <- Seq( + // Set via SQL Conf: leave options empty + ("true", Map.empty[String, String], "true"), + ("false", Map.empty[String, String], "false"), + // Set via reader options: explicitly set SQL Conf to opposite + ("true", Map("ignoreMissingFiles" -> "true"), "false"), + ("false", Map("ignoreMissingFiles" -> "false"), "true")) sources <- Seq("", format) } { - withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> ignore, - SQLConf.USE_V1_SOURCE_LIST.key -> sources) { - if (ignore.toBoolean) { - testIgnoreMissingFiles() - } else { - val exception = intercept[SparkException] { - testIgnoreMissingFiles() - } - assert(exception.getMessage().contains("does not exist")) + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> sources, + SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + if (ignore.toBoolean) { + testIgnoreMissingFiles(options) + } else { + val exception = intercept[SparkException] { + testIgnoreMissingFiles(options) } + assert(exception.getMessage().contains("does not exist")) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 64944054326..426d477d38f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -306,13 +306,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } test("Enabling/disabling ignoreCorruptFiles") { - def testIgnoreCorruptFiles(): Unit = { + def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString) - val df = spark.read.parquet( + val df = spark.read.options(options).parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString, new Path(basePath, "third").toString) @@ -320,13 +320,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } - def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = { + def testIgnoreCorruptFilesWithoutSchemaInfer(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString) - val df = spark.read.schema("a long").parquet( + val df = spark.read.options(options).schema("a long").parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString, new Path(basePath, "third").toString) @@ -334,20 +334,39 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } - withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { - testIgnoreCorruptFiles() - testIgnoreCorruptFilesWithoutSchemaInfer() + // Test ignoreCorruptFiles = true + Seq("SQLConf", "FormatOption").foreach { by => + val (sqlConf, options) = by match { + case "SQLConf" => ("true", Map.empty[String, String]) + // Explicitly set SQLConf to false but still should ignore corrupt files + case "FormatOption" => ("false", Map("ignoreCorruptFiles" -> "true")) + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> sqlConf) { + testIgnoreCorruptFiles(options) + testIgnoreCorruptFilesWithoutSchemaInfer(options) + } } - withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { - val exception = intercept[SparkException] { - testIgnoreCorruptFiles() + // Test ignoreCorruptFiles = false + Seq("SQLConf", "FormatOption").foreach { by => + val (sqlConf, options) = by match { + case "SQLConf" => ("false", Map.empty[String, String]) + // Explicitly set SQLConf to true but still should not ignore corrupt files + case "FormatOption" => ("true", Map("ignoreCorruptFiles" -> "false")) } - assert(exception.getMessage().contains("is not a Parquet file")) - val exception2 = intercept[SparkException] { - testIgnoreCorruptFilesWithoutSchemaInfer() + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> sqlConf) { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val exception = intercept[SparkException] { + testIgnoreCorruptFiles(options) + } + assert(exception.getMessage().contains("is not a Parquet file")) + val exception2 = intercept[SparkException] { + testIgnoreCorruptFilesWithoutSchemaInfer(options) + } + assert(exception2.getMessage().contains("is not a Parquet file")) + } } - assert(exception2.getMessage().contains("is not a Parquet file")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org