This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema bc471f3 is described below commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Sep 8 09:45:17 2020 +0900 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema ### What changes were proposed in this pull request? In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true: * no user specified schema * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc. ### Why are the changes needed? To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`: ```scala spark.read.csv("""/tmp/\[abc\].csv""").show spark.read.json("""/tmp/\[abc\].json""").show ``` but would end up hitting an exception: ``` org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv; at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new test cases in `DataFrameReaderWriterSuite`. Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../sql/execution/datasources/DataSource.scala | 27 ++++++++++++++++++---- .../execution/datasources/csv/CSVDataSource.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++++++++++++++++++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff5fe09..31c91f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils * metadata. For example, when reading a partitioned table from a file system, partition columns * will be inferred from the directory layout even if they are not specified. * - * @param paths A list of file system paths that hold data. These will be globbed before and - * qualified. This option only works when reading from a [[FileFormat]]. + * @param paths A list of file system paths that hold data. These will be globbed before if + * the "__globPaths__" option is true, and will be qualified. This option only works + * when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present * we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is @@ -97,6 +98,15 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + /** + * Whether or not paths should be globbed before being used to access files. + */ + def globPaths: Boolean = { + options.get(DataSource.GLOB_PATHS_KEY) + .map(_ == "true") + .getOrElse(true) + } + bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) @@ -223,7 +233,7 @@ case class DataSource( // For glob pattern, we do not check it because the glob pattern might only make sense // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) - if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { + if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) { val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") @@ -550,7 +560,11 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + val globPath = if (globPaths) { + SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + } else { + qualified :: Nil + } if (checkEmptyGlobPath && globPath.isEmpty) { throw new AnalysisException(s"Path does not exist: $qualified") @@ -741,4 +755,9 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * The key in the "options" map for deciding whether or not to glob paths before use. + */ + val GLOB_PATHS_KEY = "__globPaths__" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 62c3c16..ee408c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -276,7 +276,7 @@ object TextInputCSVDataSource extends CSVDataSource { sparkSession, paths = paths, className = classOf[TextFileFormat].getName, - options = options.parameters + options = options.parameters ++ Map(DataSource.GLOB_PATHS_KEY -> "false") ).resolveRelation(checkFilesExist = false)) .select("value").as[String](Encoders.STRING) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index d6c5888..a7a3cc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -119,7 +119,7 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession, paths = inputPaths.map(_.getPath.toString), className = classOf[TextFileFormat].getName, - options = parsedOptions.parameters + options = parsedOptions.parameters.originalMap ++ Map(DataSource.GLOB_PATHS_KEY -> "false") ).resolveRelation(checkFilesExist = false)) .select("value").as(Encoders.STRING) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 2b5b227..6b1a8e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -946,4 +946,27 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } } + + test("SPARK-32810: CSV and JSON data sources should be able to read files with " + + "escaped glob metacharacter in the paths") { + def escape(str: String): String = { + """(\[|\]|\{|\})""".r.replaceAllIn(str, """\\$1""") + } + + withTempDir { dir => + val basePath = dir.getCanonicalPath + + // test CSV writer / reader without specifying schema + val csvTableName = "[abc]" + spark.range(3).coalesce(1).write.csv(s"$basePath/$csvTableName") + val csvDf = spark.read.csv(s"$basePath/${escape(csvTableName)}") + assert(csvDf.collect sameElements Array(Row("0"), Row("1"), Row("2"))) + + // test JSON writer / reader without specifying schema + val jsonTableName = "{def}" + spark.range(3).coalesce(1).write.json(s"$basePath/$jsonTableName") + val jsonDf = spark.read.json(s"$basePath/${escape(jsonTableName)}") + assert(jsonDf.collect sameElements Array(Row(0), Row(1), Row(2))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org