This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 7ea3195  [SPARK-29101][SQL][2.4] Fix count API for csv file when 
DROPMALFORMED mode is selected
7ea3195 is described below

commit 7ea319521f1e7d0d54fd1d85d743e9717d73841d
Author: sandeep katta <sandeep.katta2...@gmail.com>
AuthorDate: Thu Sep 19 15:24:07 2019 -0700

    [SPARK-29101][SQL][2.4] Fix count API for csv file when DROPMALFORMED mode 
is selected
    
    ### What changes were proposed in this pull request?
    #DataSet
    fruit,color,price,quantity
    apple,red,1,3
    banana,yellow,2,4
    orange,orange,3,5
    xxx
    
    This PR aims to fix the below
    ```
    scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", false)
    scala> spark.read.option("header", "true").option("mode", 
"DROPMALFORMED").csv("fruit.csv").count
    res1: Long = 4
    ```
    
    This is caused by the issue 
[SPARK-24645](https://issues.apache.org/jira/browse/SPARK-24645).
    SPARK-24645 issue can also be solved by 
[SPARK-25387](https://issues.apache.org/jira/browse/SPARK-25387)
    
    ### Why are the changes needed?
    
    SPARK-24645 caused this regression, so reverted the code as it can also be 
solved by SPARK-25387
    
    ### Does this PR introduce any user-facing change?
    No,
    
    ### How was this patch tested?
    Added UT, and also tested the bug SPARK-24645
    
    **SPARK-24645 regression**
    
![image](https://user-images.githubusercontent.com/35216143/65067957-4c08ff00-d9a5-11e9-8d43-a4a23a61e8b8.png)
    
    Closes #25843 from sandeep-katta/SPARK-29101_branch2.4.
    
    Authored-by: sandeep katta <sandeep.katta2...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/datasources/csv/UnivocityParser.scala    |  7 ++++---
 sql/core/src/test/resources/test-data/malformedRow.csv     |  5 +++++
 .../spark/sql/execution/datasources/csv/CSVSuite.scala     | 14 ++++++++++++++
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 42e3964..69bd11f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,12 +203,13 @@ class UnivocityParser(
     }
   }
 
-  private val doParse = if (requiredSchema.nonEmpty) {
-    (input: String) => convert(tokenizer.parseLine(input))
-  } else {
+  private val doParse = if (options.columnPruning && requiredSchema.isEmpty) {
     // If `columnPruning` enabled and partition attributes scanned only,
     // `schema` gets empty.
     (_: String) => InternalRow.empty
+  } else {
+    // parse if the columnPruning is disabled or requiredSchema is nonEmpty
+    (input: String) => convert(tokenizer.parseLine(input))
   }
 
   /**
diff --git a/sql/core/src/test/resources/test-data/malformedRow.csv 
b/sql/core/src/test/resources/test-data/malformedRow.csv
new file mode 100644
index 0000000..8cfb3eef
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/malformedRow.csv
@@ -0,0 +1,5 @@
+fruit,color,price,quantity
+apple,red,1,3
+banana,yellow,2,4
+orange,orange,3,5
+malformedrow
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index df9d154..d714cb2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -62,6 +62,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils with Te
   private val datesFile = "test-data/dates.csv"
   private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
   private val valueMalformedFile = "test-data/value-malformed.csv"
+  private val malformedRowFile = "test-data/malformedRow.csv"
 
   /** Verifies data and schema. */
   private def verifyCars(
@@ -1861,4 +1862,17 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
       }
     }
   }
+
+  test("SPARK-29101 test count with DROPMALFORMED mode") {
+    Seq((true, 4), (false, 3)).foreach { case (csvColumnPruning, 
expectedCount) =>
+      withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> 
csvColumnPruning.toString) {
+        val count = spark.read
+          .option("header", "true")
+          .option("mode", "DROPMALFORMED")
+          .csv(testFile(malformedRowFile))
+          .count()
+        assert(expectedCount == count)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to