Repository: spark
Updated Branches:
  refs/heads/master cfcd74668 -> ffe6fd77a


[SPARK-22818][SQL] csv escape of quote escape

## What changes were proposed in this pull request?

Escape of escape should be considered when using the UniVocity csv 
encoding/decoding library.

Ref: 
https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters

One option is added for reading and writing CSV: `escapeQuoteEscaping`

## How was this patch tested?

Unit test added.

Author: soonmok-kwon <soonmok.k...@navercorp.com>

Closes #20004 from ep1804/SPARK-22818.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffe6fd77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffe6fd77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffe6fd77

Branch: refs/heads/master
Commit: ffe6fd77a42043db8ebaf43d98059dcd28a53f1e
Parents: cfcd746
Author: soonmok-kwon <soonmok.k...@navercorp.com>
Authored: Fri Dec 29 07:30:06 2017 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Dec 29 07:30:06 2017 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                | 33 +++++++++++++-------
 python/pyspark/sql/streaming.py                 | 17 ++++++----
 .../org/apache/spark/sql/DataFrameReader.scala  | 11 ++++---
 .../org/apache/spark/sql/DataFrameWriter.scala  |  9 ++++--
 .../execution/datasources/csv/CSVOptions.scala  | 10 ++++++
 .../spark/sql/streaming/DataStreamReader.scala  | 11 ++++---
 .../execution/datasources/csv/CSVSuite.scala    | 31 ++++++++++++++++++
 7 files changed, 94 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 4e58bfb..49af1bc 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -333,7 +333,7 @@ class DataFrameReader(OptionUtils):
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, 
positiveInf=None,
             negativeInf=None, dateFormat=None, timestampFormat=None, 
maxColumns=None,
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None,
-            columnNameOfCorruptRecord=None, multiLine=None):
+            columnNameOfCorruptRecord=None, multiLine=None, 
charToEscapeQuoteEscaping=None):
         """Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input 
schema if
@@ -344,17 +344,17 @@ class DataFrameReader(OptionUtils):
                      or RDD of Strings storing CSV rows.
         :param schema: an optional :class:`pyspark.sql.types.StructType` for 
the input schema
                        or a DDL-formatted string (For example ``col0 INT, col1 
DOUBLE``).
-        :param sep: sets the single character as a separator for each field 
and value.
+        :param sep: sets a single character as a separator for each field and 
value.
                     If None is set, it uses the default value, ``,``.
         :param encoding: decodes the CSV files by the given encoding type. If 
None is set,
                          it uses the default value, ``UTF-8``.
-        :param quote: sets the single character used for escaping quoted 
values where the
+        :param quote: sets a single character used for escaping quoted values 
where the
                       separator can be part of the value. If None is set, it 
uses the default
                       value, ``"``. If you would like to turn off quotations, 
you need to set an
                       empty string.
-        :param escape: sets the single character used for escaping quotes 
inside an already
+        :param escape: sets a single character used for escaping quotes inside 
an already
                        quoted value. If None is set, it uses the default 
value, ``\``.
-        :param comment: sets the single character used for skipping lines 
beginning with this
+        :param comment: sets a single character used for skipping lines 
beginning with this
                         character. By default (None), it is disabled.
         :param header: uses the first line as names of columns. If None is 
set, it uses the
                        default value, ``false``.
@@ -410,6 +410,10 @@ class DataFrameReader(OptionUtils):
                                           
``spark.sql.columnNameOfCorruptRecord``.
         :param multiLine: parse records, which may span multiple lines. If 
None is
                           set, it uses the default value, ``false``.
+        :param charToEscapeQuoteEscaping: sets a single character used for 
escaping the escape for
+                                          the quote character. If None is set, 
the default value is
+                                          escape character when escape and 
quote characters are
+                                          different, ``\0`` otherwise.
 
         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
         >>> df.dtypes
@@ -427,7 +431,8 @@ class DataFrameReader(OptionUtils):
             dateFormat=dateFormat, timestampFormat=timestampFormat, 
maxColumns=maxColumns,
             maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
-            columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine)
+            columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine,
+            charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -814,7 +819,8 @@ class DataFrameWriter(OptionUtils):
     @since(2.0)
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, 
escape=None,
             header=None, nullValue=None, escapeQuotes=None, quoteAll=None, 
dateFormat=None,
-            timestampFormat=None, ignoreLeadingWhiteSpace=None, 
ignoreTrailingWhiteSpace=None):
+            timestampFormat=None, ignoreLeadingWhiteSpace=None, 
ignoreTrailingWhiteSpace=None,
+            charToEscapeQuoteEscaping=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the 
specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -829,12 +835,12 @@ class DataFrameWriter(OptionUtils):
         :param compression: compression codec to use when saving to file. This 
can be one of the
                             known case-insensitive shorten names (none, bzip2, 
gzip, lz4,
                             snappy and deflate).
-        :param sep: sets the single character as a separator for each field 
and value. If None is
+        :param sep: sets a single character as a separator for each field and 
value. If None is
                     set, it uses the default value, ``,``.
-        :param quote: sets the single character used for escaping quoted 
values where the
+        :param quote: sets a single character used for escaping quoted values 
where the
                       separator can be part of the value. If None is set, it 
uses the default
                       value, ``"``. If an empty string is set, it uses 
``u0000`` (null character).
-        :param escape: sets the single character used for escaping quotes 
inside an already
+        :param escape: sets a single character used for escaping quotes inside 
an already
                        quoted value. If None is set, it uses the default 
value, ``\``
         :param escapeQuotes: a flag indicating whether values containing 
quotes should always
                              be enclosed in quotes. If None is set, it uses 
the default value
@@ -860,6 +866,10 @@ class DataFrameWriter(OptionUtils):
         :param ignoreTrailingWhiteSpace: a flag indicating whether or not 
trailing whitespaces from
                                          values being written should be 
skipped. If None is set, it
                                          uses the default value, ``true``.
+        :param charToEscapeQuoteEscaping: sets a single character used for 
escaping the escape for
+                                          the quote character. If None is set, 
the default value is
+                                          escape character when escape and 
quote characters are
+                                          different, ``\0`` otherwise..
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
@@ -868,7 +878,8 @@ class DataFrameWriter(OptionUtils):
                        nullValue=nullValue, escapeQuotes=escapeQuotes, 
quoteAll=quoteAll,
                        dateFormat=dateFormat, timestampFormat=timestampFormat,
                        ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
-                       ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace)
+                       ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
+                       charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
         self._jwrite.csv(path)
 
     @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index d0aba28..fb228f9 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -560,7 +560,7 @@ class DataStreamReader(OptionUtils):
             ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, 
positiveInf=None,
             negativeInf=None, dateFormat=None, timestampFormat=None, 
maxColumns=None,
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None,
-            columnNameOfCorruptRecord=None, multiLine=None):
+            columnNameOfCorruptRecord=None, multiLine=None, 
charToEscapeQuoteEscaping=None):
         """Loads a CSV file stream and returns the result as a  
:class:`DataFrame`.
 
         This function will go through the input once to determine the input 
schema if
@@ -572,17 +572,17 @@ class DataStreamReader(OptionUtils):
         :param path: string, or list of strings, for input path(s).
         :param schema: an optional :class:`pyspark.sql.types.StructType` for 
the input schema
                        or a DDL-formatted string (For example ``col0 INT, col1 
DOUBLE``).
-        :param sep: sets the single character as a separator for each field 
and value.
+        :param sep: sets a single character as a separator for each field and 
value.
                     If None is set, it uses the default value, ``,``.
         :param encoding: decodes the CSV files by the given encoding type. If 
None is set,
                          it uses the default value, ``UTF-8``.
-        :param quote: sets the single character used for escaping quoted 
values where the
+        :param quote: sets a single character used for escaping quoted values 
where the
                       separator can be part of the value. If None is set, it 
uses the default
                       value, ``"``. If you would like to turn off quotations, 
you need to set an
                       empty string.
-        :param escape: sets the single character used for escaping quotes 
inside an already
+        :param escape: sets a single character used for escaping quotes inside 
an already
                        quoted value. If None is set, it uses the default 
value, ``\``.
-        :param comment: sets the single character used for skipping lines 
beginning with this
+        :param comment: sets a single character used for skipping lines 
beginning with this
                         character. By default (None), it is disabled.
         :param header: uses the first line as names of columns. If None is 
set, it uses the
                        default value, ``false``.
@@ -638,6 +638,10 @@ class DataStreamReader(OptionUtils):
                                           
``spark.sql.columnNameOfCorruptRecord``.
         :param multiLine: parse one record, which may span multiple lines. If 
None is
                           set, it uses the default value, ``false``.
+        :param charToEscapeQuoteEscaping: sets a single character used for 
escaping the escape for
+                                          the quote character. If None is set, 
the default value is
+                                          escape character when escape and 
quote characters are
+                                          different, ``\0`` otherwise..
 
         >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> csv_sdf.isStreaming
@@ -653,7 +657,8 @@ class DataStreamReader(OptionUtils):
             dateFormat=dateFormat, timestampFormat=timestampFormat, 
maxColumns=maxColumns,
             maxCharsPerColumn=maxCharsPerColumn,
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
-            columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine)
+            columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine,
+            charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))
         else:

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index c43ee91..e8d683a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -517,17 +517,20 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * You can set the following CSV-specific options to deal with CSV files:
    * <ul>
-   * <li>`sep` (default `,`): sets the single character as a separator for each
+   * <li>`sep` (default `,`): sets a single character as a separator for each
    * field and value.</li>
    * <li>`encoding` (default `UTF-8`): decodes the CSV files by the given 
encoding
    * type.</li>
-   * <li>`quote` (default `"`): sets the single character used for escaping 
quoted values where
+   * <li>`quote` (default `"`): sets a single character used for escaping 
quoted values where
    * the separator can be part of the value. If you would like to turn off 
quotations, you need to
    * set not `null` but an empty string. This behaviour is different from
    * `com.databricks.spark.csv`.</li>
-   * <li>`escape` (default `\`): sets the single character used for escaping 
quotes inside
+   * <li>`escape` (default `\`): sets a single character used for escaping 
quotes inside
    * an already quoted value.</li>
-   * <li>`comment` (default empty string): sets the single character used for 
skipping lines
+   * <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single 
character used for
+   * escaping the escape for the quote character. The default value is escape 
character when escape
+   * and quote characters are different, `\0` otherwise.</li>
+   * <li>`comment` (default empty string): sets a single character used for 
skipping lines
    * beginning with this character. By default, it is disabled.</li>
    * <li>`header` (default `false`): uses the first line as names of 
columns.</li>
    * <li>`inferSchema` (default `false`): infers the input schema 
automatically from data. It

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7ccda0a..bd216ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -594,13 +594,16 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
    *
    * You can set the following CSV-specific option(s) for writing CSV files:
    * <ul>
-   * <li>`sep` (default `,`): sets the single character as a separator for each
+   * <li>`sep` (default `,`): sets a single character as a separator for each
    * field and value.</li>
-   * <li>`quote` (default `"`): sets the single character used for escaping 
quoted values where
+   * <li>`quote` (default `"`): sets a single character used for escaping 
quoted values where
    * the separator can be part of the value. If an empty string is set, it 
uses `u0000`
    * (null character).</li>
-   * <li>`escape` (default `\`): sets the single character used for escaping 
quotes inside
+   * <li>`escape` (default `\`): sets a single character used for escaping 
quotes inside
    * an already quoted value.</li>
+   * <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single 
character used for
+   * escaping the escape for the quote character. The default value is escape 
character when escape
+   * and quote characters are different, `\0` otherwise.</li>
    * <li>`escapeQuotes` (default `true`): a flag indicating whether values 
containing
    * quotes should always be enclosed in quotes. Default is to escape all 
values containing
    * a quote character.</li>

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index a13a5a3..c167906 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -89,6 +89,14 @@ class CSVOptions(
 
   val quote = getChar("quote", '\"')
   val escape = getChar("escape", '\\')
+  val charToEscapeQuoteEscaping = parameters.get("charToEscapeQuoteEscaping") 
match {
+    case None => None
+    case Some(null) => None
+    case Some(value) if value.length == 0 => None
+    case Some(value) if value.length == 1 => Some(value.charAt(0))
+    case _ =>
+      throw new RuntimeException("charToEscapeQuoteEscaping cannot be more 
than one character")
+  }
   val comment = getChar("comment", '\u0000')
 
   val headerFlag = getBool("header")
@@ -148,6 +156,7 @@ class CSVOptions(
     format.setDelimiter(delimiter)
     format.setQuote(quote)
     format.setQuoteEscape(escape)
+    charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
     format.setComment(comment)
     
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
     
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
@@ -165,6 +174,7 @@ class CSVOptions(
     format.setDelimiter(delimiter)
     format.setQuote(quote)
     format.setQuoteEscape(escape)
+    charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
     format.setComment(comment)
     settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
     settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index acd5ca1..2e92bee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -262,17 +262,20 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
-   * <li>`sep` (default `,`): sets the single character as a separator for each
+   * <li>`sep` (default `,`): sets a single character as a separator for each
    * field and value.</li>
    * <li>`encoding` (default `UTF-8`): decodes the CSV files by the given 
encoding
    * type.</li>
-   * <li>`quote` (default `"`): sets the single character used for escaping 
quoted values where
+   * <li>`quote` (default `"`): sets a single character used for escaping 
quoted values where
    * the separator can be part of the value. If you would like to turn off 
quotations, you need to
    * set not `null` but an empty string. This behaviour is different form
    * `com.databricks.spark.csv`.</li>
-   * <li>`escape` (default `\`): sets the single character used for escaping 
quotes inside
+   * <li>`escape` (default `\`): sets a single character used for escaping 
quotes inside
    * an already quoted value.</li>
-   * <li>`comment` (default empty string): sets the single character used for 
skipping lines
+   * <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single 
character used for
+   * escaping the escape for the quote character. The default value is escape 
character when escape
+   * and quote characters are different, `\0` otherwise.</li>
+   * <li>`comment` (default empty string): sets a single character used for 
skipping lines
    * beginning with this character. By default, it is disabled.</li>
    * <li>`header` (default `false`): uses the first line as names of 
columns.</li>
    * <li>`inferSchema` (default `false`): infers the input schema 
automatically from data. It

http://git-wip-us.apache.org/repos/asf/spark/blob/ffe6fd77/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 4fe4542..4398e54 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -482,6 +482,37 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     }
   }
 
+  test("save csv with quote escaping, using charToEscapeQuoteEscaping option") 
{
+    withTempPath { path =>
+
+      // original text
+      val df1 = Seq(
+        """You are "beautiful"""",
+        """Yes, \"in the inside"\"""
+      ).toDF()
+
+      // text written in CSV with following options:
+      // quote character: "
+      // escape character: \
+      // character to escape quote escaping: #
+      val df2 = Seq(
+        """"You are \"beautiful\""""",
+        """"Yes, #\\"in the inside\"#\""""
+      ).toDF()
+
+      df2.coalesce(1).write.text(path.getAbsolutePath)
+
+      val df3 = spark.read
+        .format("csv")
+        .option("quote", "\"")
+        .option("escape", "\\")
+        .option("charToEscapeQuoteEscaping", "#")
+        .load(path.getAbsolutePath)
+
+      checkAnswer(df1, df3)
+    }
+  }
+
   test("commented lines in CSV data") {
     Seq("false", "true").foreach { multiLine =>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to