[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema bc471f3 is described below commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c Author: Max Gekk AuthorDate: Tue Sep 8 09:45:17 2020 +0900 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema ### What changes were proposed in this pull request? In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true: * no user specified schema * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc. ### Why are the changes needed? To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`: ```scala spark.read.csv("""/tmp/\[abc\].csv""").show spark.read.json("""/tmp/\[abc\].json""").show ``` but would end up hitting an exception: ``` org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv; at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new test cases in `DataFrameReaderWriterSuite`. Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4. Authored-by: Max Gekk Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/DataSource.scala | 27 ++ .../execution/datasources/csv/CSVDataSource.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff5fe09..31c91f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils * metadata. For example, when reading a partitioned table from a file system, partition columns * will be inferred from the directory layout even if they are not specified. * - * @param paths A list of file system paths that hold data. These will be globbed before and - * qualified. This option only works when reading from a [[FileFormat]]. + * @param paths A list of file system paths that hold data. These will be globbed before if + * the "__globPaths__" option is true, and will be qualified. This option only works + * when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present *we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is @@ -97,6 +98,15 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + /** + * Whether or not paths should be globbed before being used to access files. + */ + def globPaths: Boolean = { +options.get(DataSource.GLOB_PATHS_KEY) + .map(_ == "true") + .getOrElse(true) + } + bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) @@ -223,7 +233,7 @@ case class DataSource( // For glob pattern, we do not check it because the glob pattern might only make sense // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) -if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { +if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) { val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") @@ -550,7 +560,11 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs,
[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema bc471f3 is described below commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c Author: Max Gekk AuthorDate: Tue Sep 8 09:45:17 2020 +0900 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema ### What changes were proposed in this pull request? In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true: * no user specified schema * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc. ### Why are the changes needed? To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`: ```scala spark.read.csv("""/tmp/\[abc\].csv""").show spark.read.json("""/tmp/\[abc\].json""").show ``` but would end up hitting an exception: ``` org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv; at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new test cases in `DataFrameReaderWriterSuite`. Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4. Authored-by: Max Gekk Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/DataSource.scala | 27 ++ .../execution/datasources/csv/CSVDataSource.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff5fe09..31c91f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils * metadata. For example, when reading a partitioned table from a file system, partition columns * will be inferred from the directory layout even if they are not specified. * - * @param paths A list of file system paths that hold data. These will be globbed before and - * qualified. This option only works when reading from a [[FileFormat]]. + * @param paths A list of file system paths that hold data. These will be globbed before if + * the "__globPaths__" option is true, and will be qualified. This option only works + * when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present *we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is @@ -97,6 +98,15 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + /** + * Whether or not paths should be globbed before being used to access files. + */ + def globPaths: Boolean = { +options.get(DataSource.GLOB_PATHS_KEY) + .map(_ == "true") + .getOrElse(true) + } + bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) @@ -223,7 +233,7 @@ case class DataSource( // For glob pattern, we do not check it because the glob pattern might only make sense // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) -if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { +if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) { val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") @@ -550,7 +560,11 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs,
[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema bc471f3 is described below commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c Author: Max Gekk AuthorDate: Tue Sep 8 09:45:17 2020 +0900 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema ### What changes were proposed in this pull request? In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true: * no user specified schema * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc. ### Why are the changes needed? To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`: ```scala spark.read.csv("""/tmp/\[abc\].csv""").show spark.read.json("""/tmp/\[abc\].json""").show ``` but would end up hitting an exception: ``` org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv; at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new test cases in `DataFrameReaderWriterSuite`. Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4. Authored-by: Max Gekk Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/DataSource.scala | 27 ++ .../execution/datasources/csv/CSVDataSource.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff5fe09..31c91f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils * metadata. For example, when reading a partitioned table from a file system, partition columns * will be inferred from the directory layout even if they are not specified. * - * @param paths A list of file system paths that hold data. These will be globbed before and - * qualified. This option only works when reading from a [[FileFormat]]. + * @param paths A list of file system paths that hold data. These will be globbed before if + * the "__globPaths__" option is true, and will be qualified. This option only works + * when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present *we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is @@ -97,6 +98,15 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + /** + * Whether or not paths should be globbed before being used to access files. + */ + def globPaths: Boolean = { +options.get(DataSource.GLOB_PATHS_KEY) + .map(_ == "true") + .getOrElse(true) + } + bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) @@ -223,7 +233,7 @@ case class DataSource( // For glob pattern, we do not check it because the glob pattern might only make sense // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) -if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { +if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) { val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") @@ -550,7 +560,11 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs,
[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema bc471f3 is described below commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c Author: Max Gekk AuthorDate: Tue Sep 8 09:45:17 2020 +0900 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema ### What changes were proposed in this pull request? In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true: * no user specified schema * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc. ### Why are the changes needed? To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`: ```scala spark.read.csv("""/tmp/\[abc\].csv""").show spark.read.json("""/tmp/\[abc\].json""").show ``` but would end up hitting an exception: ``` org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv; at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new test cases in `DataFrameReaderWriterSuite`. Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4. Authored-by: Max Gekk Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/DataSource.scala | 27 ++ .../execution/datasources/csv/CSVDataSource.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff5fe09..31c91f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils * metadata. For example, when reading a partitioned table from a file system, partition columns * will be inferred from the directory layout even if they are not specified. * - * @param paths A list of file system paths that hold data. These will be globbed before and - * qualified. This option only works when reading from a [[FileFormat]]. + * @param paths A list of file system paths that hold data. These will be globbed before if + * the "__globPaths__" option is true, and will be qualified. This option only works + * when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present *we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is @@ -97,6 +98,15 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + /** + * Whether or not paths should be globbed before being used to access files. + */ + def globPaths: Boolean = { +options.get(DataSource.GLOB_PATHS_KEY) + .map(_ == "true") + .getOrElse(true) + } + bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) @@ -223,7 +233,7 @@ case class DataSource( // For glob pattern, we do not check it because the glob pattern might only make sense // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) -if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { +if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) { val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") @@ -550,7 +560,11 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs,
[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema bc471f3 is described below commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c Author: Max Gekk AuthorDate: Tue Sep 8 09:45:17 2020 +0900 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema ### What changes were proposed in this pull request? In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true: * no user specified schema * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc. ### Why are the changes needed? To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`: ```scala spark.read.csv("""/tmp/\[abc\].csv""").show spark.read.json("""/tmp/\[abc\].json""").show ``` but would end up hitting an exception: ``` org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv; at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new test cases in `DataFrameReaderWriterSuite`. Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4. Authored-by: Max Gekk Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/DataSource.scala | 27 ++ .../execution/datasources/csv/CSVDataSource.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++ 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ff5fe09..31c91f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -65,8 +65,9 @@ import org.apache.spark.util.Utils * metadata. For example, when reading a partitioned table from a file system, partition columns * will be inferred from the directory layout even if they are not specified. * - * @param paths A list of file system paths that hold data. These will be globbed before and - * qualified. This option only works when reading from a [[FileFormat]]. + * @param paths A list of file system paths that hold data. These will be globbed before if + * the "__globPaths__" option is true, and will be qualified. This option only works + * when reading from a [[FileFormat]]. * @param userSpecifiedSchema An optional specification of the schema of the data. When present *we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is @@ -97,6 +98,15 @@ case class DataSource( private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + /** + * Whether or not paths should be globbed before being used to access files. + */ + def globPaths: Boolean = { +options.get(DataSource.GLOB_PATHS_KEY) + .map(_ == "true") + .getOrElse(true) + } + bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( bucket.bucketColumnNames, "in the bucket definition", equality) @@ -223,7 +233,7 @@ case class DataSource( // For glob pattern, we do not check it because the glob pattern might only make sense // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) -if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { +if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) { val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") @@ -550,7 +560,11 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs,