This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 7ea3195 [SPARK-29101][SQL][2.4] Fix count API for csv file when DROPMALFORMED mode is selected 7ea3195 is described below commit 7ea319521f1e7d0d54fd1d85d743e9717d73841d Author: sandeep katta <sandeep.katta2...@gmail.com> AuthorDate: Thu Sep 19 15:24:07 2019 -0700 [SPARK-29101][SQL][2.4] Fix count API for csv file when DROPMALFORMED mode is selected ### What changes were proposed in this pull request? #DataSet fruit,color,price,quantity apple,red,1,3 banana,yellow,2,4 orange,orange,3,5 xxx This PR aims to fix the below ``` scala> spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", false) scala> spark.read.option("header", "true").option("mode", "DROPMALFORMED").csv("fruit.csv").count res1: Long = 4 ``` This is caused by the issue [SPARK-24645](https://issues.apache.org/jira/browse/SPARK-24645). SPARK-24645 issue can also be solved by [SPARK-25387](https://issues.apache.org/jira/browse/SPARK-25387) ### Why are the changes needed? SPARK-24645 caused this regression, so reverted the code as it can also be solved by SPARK-25387 ### Does this PR introduce any user-facing change? No, ### How was this patch tested? Added UT, and also tested the bug SPARK-24645 **SPARK-24645 regression** ![image](https://user-images.githubusercontent.com/35216143/65067957-4c08ff00-d9a5-11e9-8d43-a4a23a61e8b8.png) Closes #25843 from sandeep-katta/SPARK-29101_branch2.4. Authored-by: sandeep katta <sandeep.katta2...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/execution/datasources/csv/UnivocityParser.scala | 7 ++++--- sql/core/src/test/resources/test-data/malformedRow.csv | 5 +++++ .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 42e3964..69bd11f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -203,12 +203,13 @@ class UnivocityParser( } } - private val doParse = if (requiredSchema.nonEmpty) { - (input: String) => convert(tokenizer.parseLine(input)) - } else { + private val doParse = if (options.columnPruning && requiredSchema.isEmpty) { // If `columnPruning` enabled and partition attributes scanned only, // `schema` gets empty. (_: String) => InternalRow.empty + } else { + // parse if the columnPruning is disabled or requiredSchema is nonEmpty + (input: String) => convert(tokenizer.parseLine(input)) } /** diff --git a/sql/core/src/test/resources/test-data/malformedRow.csv b/sql/core/src/test/resources/test-data/malformedRow.csv new file mode 100644 index 0000000..8cfb3eef --- /dev/null +++ b/sql/core/src/test/resources/test-data/malformedRow.csv @@ -0,0 +1,5 @@ +fruit,color,price,quantity +apple,red,1,3 +banana,yellow,2,4 +orange,orange,3,5 +malformedrow diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index df9d154..d714cb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -62,6 +62,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te private val datesFile = "test-data/dates.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" + private val malformedRowFile = "test-data/malformedRow.csv" /** Verifies data and schema. */ private def verifyCars( @@ -1861,4 +1862,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } } + + test("SPARK-29101 test count with DROPMALFORMED mode") { + Seq((true, 4), (false, 3)).foreach { case (csvColumnPruning, expectedCount) => + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> csvColumnPruning.toString) { + val count = spark.read + .option("header", "true") + .option("mode", "DROPMALFORMED") + .csv(testFile(malformedRowFile)) + .count() + assert(expectedCount == count) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org