Repository: spark Updated Branches: refs/heads/master c5aa54d54 -> bd32b509a
[SPARK-24645][SQL] Skip parsing when csvColumnPruning enabled and partitions scanned only ## What changes were proposed in this pull request? In the master, when `csvColumnPruning`(implemented in [this commit](https://github.com/apache/spark/commit/64fad0b519cf35b8c0a0dec18dd3df9488a5ed25#diff-d19881aceddcaa5c60620fdcda99b4c4)) enabled and partitions scanned only, it throws an exception below; ``` scala> val dir = "/tmp/spark-csv/csv" scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir) scala> spark.read.csv(dir).selectExpr("sum(p)").collect() 18/06/25 13:12:51 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:197) at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:190) at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309) at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309) at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61) ... ``` This pr modified code to skip CSV parsing in the case. ## How was this patch tested? Added tests in `CSVSuite`. Author: Takeshi Yamamuro <yamam...@apache.org> Closes #21631 from maropu/SPARK-24645. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd32b509 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd32b509 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd32b509 Branch: refs/heads/master Commit: bd32b509a1728366494cba13f8f6612b7bd46ec0 Parents: c5aa54d Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Thu Jun 28 09:19:25 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Thu Jun 28 09:19:25 2018 +0800 ---------------------------------------------------------------------- .../sql/execution/datasources/csv/UnivocityParser.scala | 10 +++++++++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 10 ++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bd32b509/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 5f7d569..aa545e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -183,11 +183,19 @@ class UnivocityParser( } } + private val doParse = if (schema.nonEmpty) { + (input: String) => convert(tokenizer.parseLine(input)) + } else { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private def convert(tokens: Array[String]): InternalRow = { if (tokens.length != schema.length) { http://git-wip-us.apache.org/repos/asf/spark/blob/bd32b509/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 365239d..84b91f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1569,4 +1569,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(testAppender2.events.asScala .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } + + test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { + withTempPath { path => + val dir = path.getAbsolutePath + spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir) + checkAnswer(spark.read.csv(dir).selectExpr("sum(p)"), Row(5)) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org