Repository: spark Updated Branches: refs/heads/master 12cd00706 -> 07c12c09a
[SPARK-18579][SQL] Use ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options in CSV writing ## What changes were proposed in this pull request? This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser. Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces. It seems we should provide a way to keep this white spaces easily. WIth the data below: ```scala val df = spark.read.csv(Seq("a , b , c").toDS) df.show() ``` ``` +---+----+---+ |_c0| _c1|_c2| +---+----+---+ | a | b | c| +---+----+---+ ``` **Before** ```scala df.write.csv("/tmp/text.csv") spark.read.text("/tmp/text.csv").show() ``` ``` +-----+ |value| +-----+ |a,b,c| +-----+ ``` It seems this can't be worked around via `quoteAll` too. ```scala df.write.option("quoteAll", true).csv("/tmp/text.csv") spark.read.text("/tmp/text.csv").show() ``` ``` +-----------+ | value| +-----------+ |"a","b","c"| +-----------+ ``` **After** ```scala df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv") spark.read.text("/tmp/text.csv").show() ``` ``` +----------+ | value| +----------+ |a , b , c| +----------+ ``` Note that this case is possible in R ```r > system("cat text.csv") f1,f2,f3 a , b , c > df <- read.csv(file="text.csv") > df f1 f2 f3 1 a b c > write.csv(df, file="text1.csv", quote=F, row.names=F) > system("cat text1.csv") f1,f2,f3 a , b , c ``` ## How was this patch tested? Unit tests in `CSVSuite` and manual tests for Python. Author: hyukjinkwon <gurwls...@gmail.com> Closes #17310 from HyukjinKwon/SPARK-18579. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07c12c09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07c12c09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07c12c09 Branch: refs/heads/master Commit: 07c12c09a75645f6b56b30654455b3838b7b6637 Parents: 12cd007 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Thu Mar 23 00:25:01 2017 -0700 Committer: Felix Cheung <felixche...@apache.org> Committed: Thu Mar 23 00:25:01 2017 -0700 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 28 ++++++---- python/pyspark/sql/streaming.py | 12 ++--- python/pyspark/sql/tests.py | 13 +++++ .../org/apache/spark/sql/DataFrameReader.scala | 6 +-- .../org/apache/spark/sql/DataFrameWriter.scala | 6 ++- .../execution/datasources/csv/CSVOptions.scala | 15 ++++-- .../spark/sql/streaming/DataStreamReader.scala | 6 +-- .../execution/datasources/csv/CSVSuite.scala | 57 ++++++++++++++++++++ 8 files changed, 116 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 759c275..5e732b4 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -341,12 +341,12 @@ class DataFrameReader(OptionUtils): default value, ``false``. :param inferSchema: infers the input schema automatically from data. It requires one extra pass over the data. If None is set, it uses the default value, ``false``. - :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. - :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. + :param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. + :param ignoreTrailingWhiteSpace: A flag indicating whether or not trailing whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. @@ -706,7 +706,7 @@ class DataFrameWriter(OptionUtils): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None): + timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -728,10 +728,10 @@ class DataFrameWriter(OptionUtils): empty string. :param escape: sets the single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, ``\`` - :param escapeQuotes: A flag indicating whether values containing quotes should always + :param escapeQuotes: a flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value ``true``, escaping all values containing a quote character. - :param quoteAll: A flag indicating whether all values should always be enclosed in + :param quoteAll: a flag indicating whether all values should always be enclosed in quotes. If None is set, it uses the default value ``false``, only escaping values containing a quote character. :param header: writes the names of columns as the first line. If None is set, it uses @@ -746,13 +746,21 @@ class DataFrameWriter(OptionUtils): formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from + values being written should be skipped. If None is set, it + uses the default value, ``true``. + :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from + values being written should be skipped. If None is set, it + uses the default value, ``true``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat, timestampFormat=timestampFormat) + dateFormat=dateFormat, timestampFormat=timestampFormat, + ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace) self._jwrite.csv(path) @since(1.5) http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e227f9c..80f4340 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -597,12 +597,12 @@ class DataStreamReader(OptionUtils): default value, ``false``. :param inferSchema: infers the input schema automatically from data. It requires one extra pass over the data. If None is set, it uses the default value, ``false``. - :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. - :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. + :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. + :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f0a9a04..29d613b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -450,6 +450,19 @@ class SQLTests(ReusedPySparkTestCase): Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')] self.assertEqual(ages_newlines.collect(), expected) + def test_ignorewhitespace_csv(self): + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + self.spark.createDataFrame([[" a", "b ", " c "]]).write.csv( + tmpPath, + ignoreLeadingWhiteSpace=False, + ignoreTrailingWhiteSpace=False) + + expected = [Row(value=u' a,b , c ')] + readback = self.spark.read.text(tmpPath) + self.assertEqual(readback.collect(), expected) + shutil.rmtree(tmpPath) + def test_read_multiple_orc_file(self): df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0", "python/test_support/sql/orc_partitioned/b=1/c=1"]) http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e39b4d9..e6d2b1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -489,9 +489,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`header` (default `false`): uses the first line as names of columns.</li> * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It * requires one extra pass over the data.</li> - * <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces - * from values being read should be skipped.</li> - * <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing + * <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading + * whitespaces from values being read should be skipped.</li> + * <li>`ignoreTrailingWhiteSpace` (default `false`): a flag indicating whether or not trailing * whitespaces from values being read should be skipped.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value. Since * 2.0.1, this applies to all supported types including the string type.</li> http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3e975ef..e973d0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -573,7 +573,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`escapeQuotes` (default `true`): a flag indicating whether values containing * quotes should always be enclosed in quotes. Default is to escape all values containing * a quote character.</li> - * <li>`quoteAll` (default `false`): A flag indicating whether all values should always be + * <li>`quoteAll` (default `false`): a flag indicating whether all values should always be * enclosed in quotes. Default is to only escape values containing a quote character.</li> * <li>`header` (default `false`): writes the names of columns as the first line.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value.</li> @@ -586,6 +586,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`ignoreLeadingWhiteSpace` (default `true`): a flag indicating whether or not leading + * whitespaces from values being written should be skipped.</li> + * <li>`ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not + * trailing whitespaces from values being written should be skipped.</li> * </ul> * * @since 2.0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 5d2c23e..e7b79e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -93,8 +93,13 @@ class CSVOptions( val headerFlag = getBool("header") val inferSchemaFlag = getBool("inferSchema") - val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace") - val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace") + val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", default = false) + val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", default = false) + + // For write, both options were `true` by default. We leave it as `true` for + // backwards compatibility. + val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLeadingWhiteSpace", default = true) + val ignoreTrailingWhiteSpaceFlagInWrite = getBool("ignoreTrailingWhiteSpace", default = true) val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) @@ -144,6 +149,8 @@ class CSVOptions( format.setQuote(quote) format.setQuoteEscape(escape) format.setComment(comment) + writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite) + writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite) writerSettings.setNullValue(nullValue) writerSettings.setEmptyValue(nullValue) writerSettings.setSkipEmptyLines(true) @@ -159,8 +166,8 @@ class CSVOptions( format.setQuote(quote) format.setQuoteEscape(escape) format.setComment(comment) - settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlag) - settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlag) + settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead) + settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead) settings.setReadInputOnSeparateThread(false) settings.setInputBufferSize(inputBufferSize) settings.setMaxColumns(maxColumns) http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index f6e2fef..997ca28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -238,9 +238,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <li>`header` (default `false`): uses the first line as names of columns.</li> * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It * requires one extra pass over the data.</li> - * <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces - * from values being read should be skipped.</li> - * <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing + * <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading + * whitespaces from values being read should be skipped.</li> + * <li>`ignoreTrailingWhiteSpace` (default `false`): a flag indicating whether or not trailing * whitespaces from values being read should be skipped.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value. Since * 2.0.1, this applies to all supported types including the string type.</li> http://git-wip-us.apache.org/repos/asf/spark/blob/07c12c09/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2600894..d70c47f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1117,4 +1117,61 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df2.schema === schema) } + test("ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options - read") { + val input = " a,b , c " + + // For reading, default of both `ignoreLeadingWhiteSpace` and`ignoreTrailingWhiteSpace` + // are `false`. So, these are excluded. + val combinations = Seq( + (true, true), + (false, true), + (true, false)) + + // Check if read rows ignore whitespaces as configured. + val expectedRows = Seq( + Row("a", "b", "c"), + Row(" a", "b", " c"), + Row("a", "b ", "c ")) + + combinations.zip(expectedRows) + .foreach { case ((ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace), expected) => + val df = spark.read + .option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace) + .option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace) + .csv(Seq(input).toDS()) + + checkAnswer(df, expected) + } + } + + test("SPARK-18579: ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options - write") { + val df = Seq((" a", "b ", " c ")).toDF() + + // For writing, default of both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` + // are `true`. So, these are excluded. + val combinations = Seq( + (false, false), + (false, true), + (true, false)) + + // Check if written lines ignore each whitespaces as configured. + val expectedLines = Seq( + " a,b , c ", + " a,b, c", + "a,b ,c ") + + combinations.zip(expectedLines) + .foreach { case ((ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace), expected) => + withTempPath { path => + df.write + .option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace) + .option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace) + .csv(path.getAbsolutePath) + + // Read back the written lines. + val readBack = spark.read.text(path.getAbsolutePath) + checkAnswer(readBack, Row(expected)) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org