Repository: spark
Updated Branches:
  refs/heads/master 3f0e801c1 -> 7a2d4895c


[SPARK-17916][SQL] Fix empty string being parsed as null when nullValue is set.

## What changes were proposed in this pull request?

I propose to bump version of uniVocity parser up to 2.6.3 where quoted empty 
strings are replaced by the empty value (passed to `setEmptyValue`) instead of 
`null` values as in the current version 2.5.9:
https://github.com/uniVocity/univocity-parsers/blob/v2.6.3/src/main/java/com/univocity/parsers/csv/CsvParser.java#L125

Empty value for writer is set to `""`. So, empty string in dataframe/dataset is 
stored as empty quoted string `""`. Empty value for reader is set to empty 
string (zero size). In this way, saved empty quoted string will be read as just 
empty string. Please, look at the tests for more details.

Here are main changes made in 
[2.6.0](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.0), 
[2.6.1](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.1), 
[2.6.2](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.2), 
[2.6.3](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.3):

- CSV parser now parses quoted values ~30% faster
- CSV format detection process has option provide a list of possible 
delimiters, in order of priority ( i.e. settings.detectFormatAutomatically( 
'-', '.');) - https://github.com/uniVocity/univocity-parsers/issues/214
- Implemented trim quoted values support - 
https://github.com/uniVocity/univocity-parsers/issues/230
- NullPointer when stopping parser when nothing is parsed - 
https://github.com/uniVocity/univocity-parsers/issues/219
- Concurrency issue when calling stopParsing() - 
https://github.com/uniVocity/univocity-parsers/issues/231

Closes #20068

## How was this patch tested?

Added tests from the PR https://github.com/apache/spark/pull/20068

Author: Maxim Gekk <maxim.g...@databricks.com>

Closes #21273 from MaxGekk/univocity-2.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a2d4895
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a2d4895
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a2d4895

Branch: refs/heads/master
Commit: 7a2d4895c75d4c232c377876b61c05a083eab3c8
Parents: 3f0e801
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Mon May 14 10:01:06 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Mon May 14 10:01:06 2018 +0800

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.6                  |  2 +-
 dev/deps/spark-deps-hadoop-2.7                  |  2 +-
 dev/deps/spark-deps-hadoop-3.1                  |  2 +-
 sql/core/pom.xml                                |  2 +-
 .../execution/datasources/csv/CSVOptions.scala  |  3 +-
 .../datasources/csv/CSVBenchmarks.scala         | 80 ++++++++++++++++++++
 .../execution/datasources/csv/CSVSuite.scala    | 46 +++++++++++
 7 files changed, 132 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index f552b81..e710e26 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -190,7 +190,7 @@ stax-api-1.0.1.jar
 stream-2.7.0.jar
 stringtemplate-3.2.1.jar
 super-csv-2.2.0.jar
-univocity-parsers-2.5.9.jar
+univocity-parsers-2.6.3.jar
 validation-api-1.1.0.Final.jar
 xbean-asm5-shaded-4.4.jar
 xercesImpl-2.9.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 024b1fc..97ad17a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -191,7 +191,7 @@ stax-api-1.0.1.jar
 stream-2.7.0.jar
 stringtemplate-3.2.1.jar
 super-csv-2.2.0.jar
-univocity-parsers-2.5.9.jar
+univocity-parsers-2.6.3.jar
 validation-api-1.1.0.Final.jar
 xbean-asm5-shaded-4.4.jar
 xercesImpl-2.9.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/dev/deps/spark-deps-hadoop-3.1
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 938de7b..e21bfef 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -211,7 +211,7 @@ stream-2.7.0.jar
 stringtemplate-3.2.1.jar
 super-csv-2.2.0.jar
 token-provider-1.0.1.jar
-univocity-parsers-2.5.9.jar
+univocity-parsers-2.6.3.jar
 validation-api-1.1.0.Final.jar
 woodstox-core-5.0.3.jar
 xbean-asm5-shaded-4.4.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index ef41837..f270c70 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -38,7 +38,7 @@
     <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
-      <version>2.5.9</version>
+      <version>2.6.3</version>
       <type>jar</type>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index ed2dc65..1066d15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -164,7 +164,7 @@ class CSVOptions(
     
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
     
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
     writerSettings.setNullValue(nullValue)
-    writerSettings.setEmptyValue(nullValue)
+    writerSettings.setEmptyValue("\"\"")
     writerSettings.setSkipEmptyLines(true)
     writerSettings.setQuoteAllFields(quoteAll)
     writerSettings.setQuoteEscapingEnabled(escapeQuotes)
@@ -185,6 +185,7 @@ class CSVOptions(
     settings.setInputBufferSize(inputBufferSize)
     settings.setMaxColumns(maxColumns)
     settings.setNullValue(nullValue)
+    settings.setEmptyValue("")
     settings.setMaxCharsPerColumn(maxCharsPerColumn)
     
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
     settings

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
new file mode 100644
index 0000000..d442ba7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Column, Row, SparkSession}
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+ * Benchmark to measure CSV read/write performance.
+ * To run this:
+ *  spark-submit --class <this class> --jars <spark sql test jar>
+ */
+object CSVBenchmarks {
+  val conf = new SparkConf()
+
+  val spark = SparkSession.builder
+    .master("local[1]")
+    .appName("benchmark-csv-datasource")
+    .config(conf)
+    .getOrCreate()
+  import spark.implicits._
+
+  def withTempPath(f: File => Unit): Unit = {
+    val path = Utils.createTempDir()
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
+    val benchmark = new Benchmark(s"Parsing quoted values", rowsNum)
+
+    withTempPath { path =>
+      val str = (0 until 10000).map(i => s""""$i"""").mkString(",")
+
+      spark.range(rowsNum)
+        .map(_ => str)
+        .write.option("header", true)
+        .csv(path.getAbsolutePath)
+
+      val schema = new StructType().add("value", StringType)
+      val ds = spark.read.option("header", 
true).schema(schema).csv(path.getAbsolutePath)
+
+      benchmark.addCase(s"One quoted string", numIters) { _ =>
+        ds.filter((_: Row) => true).count()
+      }
+
+      /*
+      Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
+
+      Parsing quoted values:               Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+      
--------------------------------------------------------------------------------------------
+      One quoted string                       30273 / 30549          0.0      
605451.2       1.0X
+      */
+      benchmark.run()
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2d4895/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 461abdd..07e6c74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1322,4 +1322,50 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     val sampled = spark.read.option("inferSchema", 
true).option("samplingRatio", 1.0).csv(ds)
     assert(sampled.count() == ds.count())
   }
+
+  test("SPARK-17916: An empty string should not be coerced to null when 
nullValue is passed.") {
+    val litNull: String = null
+    val df = Seq(
+      (1, "John Doe"),
+      (2, ""),
+      (3, "-"),
+      (4, litNull)
+    ).toDF("id", "name")
+
+    // Checks for new behavior where an empty string is not coerced to null 
when `nullValue` is
+    // set to anything but an empty string literal.
+    withTempPath { path =>
+      df.write
+        .option("nullValue", "-")
+        .csv(path.getAbsolutePath)
+      val computed = spark.read
+        .option("nullValue", "-")
+        .schema(df.schema)
+        .csv(path.getAbsolutePath)
+      val expected = Seq(
+        (1, "John Doe"),
+        (2, ""),
+        (3, litNull),
+        (4, litNull)
+      ).toDF("id", "name")
+
+      checkAnswer(computed, expected)
+    }
+    // Keeps the old behavior where empty string us coerced to nullValue is 
not passed.
+    withTempPath { path =>
+      df.write
+        .csv(path.getAbsolutePath)
+      val computed = spark.read
+        .schema(df.schema)
+        .csv(path.getAbsolutePath)
+      val expected = Seq(
+        (1, "John Doe"),
+        (2, litNull),
+        (3, "-"),
+        (4, litNull)
+      ).toDF("id", "name")
+
+      checkAnswer(computed, expected)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to