Repository: spark
Updated Branches:
  refs/heads/master c5aa54d54 -> bd32b509a


[SPARK-24645][SQL] Skip parsing when csvColumnPruning enabled and partitions 
scanned only

## What changes were proposed in this pull request?
In the master, when `csvColumnPruning`(implemented in [this 
commit](https://github.com/apache/spark/commit/64fad0b519cf35b8c0a0dec18dd3df9488a5ed25#diff-d19881aceddcaa5c60620fdcda99b4c4))
 enabled and partitions scanned only, it throws an exception below;

```
scala> val dir = "/tmp/spark-csv/csv"
scala> spark.range(10).selectExpr("id % 2 AS p", 
"id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 13:12:51 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5)
java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:197)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:190)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309)
        at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:309)
        at 
org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
        ...
```
This pr modified code to skip CSV parsing in the case.

## How was this patch tested?
Added tests in `CSVSuite`.

Author: Takeshi Yamamuro <yamam...@apache.org>

Closes #21631 from maropu/SPARK-24645.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd32b509
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd32b509
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd32b509

Branch: refs/heads/master
Commit: bd32b509a1728366494cba13f8f6612b7bd46ec0
Parents: c5aa54d
Author: Takeshi Yamamuro <yamam...@apache.org>
Authored: Thu Jun 28 09:19:25 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Thu Jun 28 09:19:25 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/csv/UnivocityParser.scala   | 10 +++++++++-
 .../spark/sql/execution/datasources/csv/CSVSuite.scala    | 10 ++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd32b509/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 5f7d569..aa545e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -183,11 +183,19 @@ class UnivocityParser(
     }
   }
 
+  private val doParse = if (schema.nonEmpty) {
+    (input: String) => convert(tokenizer.parseLine(input))
+  } else {
+    // If `columnPruning` enabled and partition attributes scanned only,
+    // `schema` gets empty.
+    (_: String) => InternalRow.empty
+  }
+
   /**
    * Parses a single CSV string and turns it into either one resulting row or 
no row (if the
    * the record is malformed).
    */
-  def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
+  def parse(input: String): InternalRow = doParse(input)
 
   private def convert(tokens: Array[String]): InternalRow = {
     if (tokens.length != schema.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd32b509/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 365239d..84b91f6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1569,4 +1569,14 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     assert(testAppender2.events.asScala
       .exists(msg => msg.getRenderedMessage.contains("CSV header does not 
conform to the schema")))
   }
+
+  test("SPARK-24645 skip parsing when columnPruning enabled and partitions 
scanned only") {
+    withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") {
+      withTempPath { path =>
+        val dir = path.getAbsolutePath
+        spark.range(10).selectExpr("id % 2 AS p", 
"id").write.partitionBy("p").csv(dir)
+        checkAnswer(spark.read.csv(dir).selectExpr("sum(p)"), Row(5))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to