This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fef3379 [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options fef3379 is described below commit fef337935179248e88aa4be84a4214fea1c3a11a Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Jul 2 06:09:54 2020 +0800 [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/28760 to fix the remaining issues: 1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand` 2. should consider data source options when inferring schema for file source 3. should consider data source options when getting the qualified path in file source v2. ### Why are the changes needed? We didn't catch these issues in https://github.com/apache/spark/pull/28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? rewrite the test to make sure the entire data source read/write action can succeed. Closes #28948 from cloud-fan/fix. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Gengliang Wang <gengliang.w...@databricks.com> (cherry picked from commit 6edb20df834f7f9b85c1559ef78a3d0d2272e4df) Signed-off-by: Gengliang Wang <gengliang.w...@databricks.com> --- .../spark/sql/v2/avro/AvroDataSourceV2.scala | 4 +-- .../apache/spark/sql/execution/CacheManager.scala | 15 +++++++---- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../execution/datasources/SchemaMergeUtils.scala | 4 ++- .../sql/execution/datasources/orc/OrcUtils.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 3 ++- .../datasources/parquet/ParquetUtils.scala | 2 +- .../datasources/v2/FileDataSourceV2.scala | 13 ++++++--- .../datasources/v2/csv/CSVDataSourceV2.scala | 4 +-- .../datasources/v2/json/JsonDataSourceV2.scala | 4 +-- .../datasources/v2/orc/OrcDataSourceV2.scala | 4 +-- .../v2/parquet/ParquetDataSourceV2.scala | 4 +-- .../datasources/v2/text/TextDataSourceV2.scala | 4 +-- .../spark/sql/FileBasedDataSourceSuite.scala | 31 +++++++++++++--------- .../execution/datasources/orc/OrcSourceSuite.scala | 4 +-- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 4 +-- 16 files changed, 59 insertions(+), 45 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala index c6f52d6..969dee0 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala @@ -31,13 +31,13 @@ class AvroDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 52cec8b..7d86c48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -248,12 +248,17 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { * `HadoopFsRelation` node(s) as part of its logical plan. */ def recacheByPath(spark: SparkSession, resourcePath: String): Unit = { - val (fs, qualifiedPath) = { - val path = new Path(resourcePath) - val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) - (fs, fs.makeQualified(path)) - } + val path = new Path(resourcePath) + val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) + recacheByPath(spark, path, fs) + } + /** + * Tries to re-cache all the cache entries that contain `resourcePath` in one or more + * `HadoopFsRelation` node(s) as part of its logical plan. + */ + def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = { + val qualifiedPath = fs.makeQualified(resourcePath) recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index f119721..fe733f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -192,7 +192,7 @@ case class InsertIntoHadoopFsRelationCommand( // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) // refresh data cache if table is cached - sparkSession.catalog.refreshByPath(outputPath.toString) + sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs) if (catalogTable.nonEmpty) { CommandUtils.updateTableStats(sparkSession, catalogTable.get) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index 99882b0..28097c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -32,10 +32,12 @@ object SchemaMergeUtils extends Logging { */ def mergeSchemasInParallel( sparkSession: SparkSession, + parameters: Map[String, String], files: Seq[FileStatus], schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]) : Option[StructType] = { - val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) + val serializedConf = new SerializableConfiguration( + sparkSession.sessionState.newHadoopConfWithOptions(parameters)) // !! HACK ALERT !! // Here is a hack for Parquet, but it can be used by Orc as well. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index eea9b2a..d274bcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -109,7 +109,7 @@ object OrcUtils extends Logging { val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) if (orcOptions.mergeSchema) { SchemaMergeUtils.mergeSchemasInParallel( - sparkSession, files, OrcUtils.readOrcSchemasInParallel) + sparkSession, options, files, OrcUtils.readOrcSchemasInParallel) } else { OrcUtils.readSchema(sparkSession, files) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 7187410..68f49f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -475,6 +475,7 @@ object ParquetFileFormat extends Logging { * S3 nodes). */ def mergeSchemasInParallel( + parameters: Map[String, String], filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString @@ -490,7 +491,7 @@ object ParquetFileFormat extends Logging { .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) } - SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader) + SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 7e7dba9..b91d75c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -104,7 +104,7 @@ object ParquetUtils { .orElse(filesByType.data.headOption) .toSeq } - ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession) } case class FileTypes( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index 30a964d..bbe8835 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util +import scala.collection.JavaConverters._ + import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession @@ -53,14 +56,16 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { paths ++ Option(map.get("path")).toSeq } - protected def getTableName(paths: Seq[String]): String = { - val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",") + protected def getTableName(map: CaseInsensitiveStringMap, paths: Seq[String]): String = { + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions( + map.asCaseSensitiveMap().asScala.toMap) + val name = shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",") Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name) } - private def qualifiedPathName(path: String): String = { + private def qualifiedPathName(path: String, hadoopConf: Configuration): String = { val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = hdfsPath.getFileSystem(hadoopConf) hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala index 1f99d42..69d001b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala @@ -31,13 +31,13 @@ class CSVDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala index 7a0949e..9c4e3b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala @@ -31,13 +31,13 @@ class JsonDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index 8665af3..fa2febd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -31,13 +31,13 @@ class OrcDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala index 8cb6186..7e7ca96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala @@ -31,13 +31,13 @@ class ParquetDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala index 049c717..43bcb61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala @@ -31,13 +31,13 @@ class TextDataSourceV2 extends FileDataSourceV2 { override def getTable(options: CaseInsensitiveStringMap): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) } override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val paths = getPaths(options) - val tableName = getTableName(paths) + val tableName = getTableName(options, paths) TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index d8157d3..231a8f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql import java.io.{File, FileNotFoundException} +import java.net.URI import java.nio.file.{Files, StandardOpenOption} import java.util.Locale import scala.collection.mutable -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} @@ -845,19 +847,15 @@ class FileBasedDataSourceSuite extends QueryTest test("SPARK-31935: Hadoop file system config should be effective in data source options") { Seq("parquet", "").foreach { format => - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> format, + "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName, + "fs.file.impl.disable.cache" -> "true") { withTempDir { dir => - val path = dir.getCanonicalPath - val defaultFs = "nonexistFS://nonexistFS" - val expectMessage = "No FileSystem for scheme nonexistFS" - val message1 = intercept[java.io.IOException] { - spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) - }.getMessage - assert(message1.filterNot(Set(':', '"').contains) == expectMessage) - val message2 = intercept[java.io.IOException] { - spark.read.option("fs.defaultFS", defaultFs).parquet(path) - }.getMessage - assert(message2.filterNot(Set(':', '"').contains) == expectMessage) + val path = "file:" + dir.getCanonicalPath.stripPrefix("file:") + spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path) + checkAnswer( + spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF()) } } } @@ -932,3 +930,10 @@ object TestingUDT { override def userClass: Class[NullData] = classOf[NullData] } } + +class FakeFileSystemRequiringDSOption extends LocalFileSystem { + override def initialize(name: URI, conf: Configuration): Unit = { + super.initialize(name, conf) + require(conf.get("ds_option", "") == "value") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 7387368..b70fd74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -213,9 +213,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten val schema = SchemaMergeUtils.mergeSchemasInParallel( - spark, - fileStatuses, - schemaReader) + spark, Map.empty, fileStatuses, schemaReader) assert(schema.isDefined) assert(schema.get == StructType(Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 7f2eb14..356b92b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -70,9 +70,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) if (orcOptions.mergeSchema) { SchemaMergeUtils.mergeSchemasInParallel( - sparkSession, - files, - OrcFileOperator.readOrcSchemasInParallel) + sparkSession, options, files, OrcFileOperator.readOrcSchemasInParallel) } else { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles OrcFileOperator.readSchema( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org