Repository: spark Updated Branches: refs/heads/master afb062753 -> 78e0a725e
[SPARK-19018][SQL] Add support for custom encoding on csv writer ## What changes were proposed in this pull request? Add support for custom encoding on csv writer, see https://issues.apache.org/jira/browse/SPARK-19018 ## How was this patch tested? Added two unit tests in CSVSuite Author: crafty-coder <carlosp...@gmail.com> Author: Carlos <crafty-co...@users.noreply.github.com> Closes #20949 from crafty-coder/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78e0a725 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78e0a725 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78e0a725 Branch: refs/heads/master Commit: 78e0a725e06665cf92d4b8f987ee01947a1d620c Parents: afb0627 Author: crafty-coder <carlosp...@gmail.com> Authored: Wed Jul 25 14:17:20 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Wed Jul 25 14:17:20 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 7 +++- .../org/apache/spark/sql/DataFrameWriter.scala | 2 + .../datasources/csv/CSVFileFormat.scala | 6 ++- .../execution/datasources/csv/CSVSuite.scala | 39 +++++++++++++++++++- 4 files changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 3efe2ad..98b2cd9 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -859,7 +859,7 @@ class DataFrameWriter(OptionUtils): def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, - charToEscapeQuoteEscaping=None): + charToEscapeQuoteEscaping=None, encoding=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -909,6 +909,8 @@ class DataFrameWriter(OptionUtils): the quote character. If None is set, the default value is escape character when escape and quote characters are different, ``\0`` otherwise.. + :param encoding: sets the encoding (charset) of saved csv files. If None is set, + the default UTF-8 charset will be used. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -918,7 +920,8 @@ class DataFrameWriter(OptionUtils): dateFormat=dateFormat, timestampFormat=timestampFormat, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, - charToEscapeQuoteEscaping=charToEscapeQuoteEscaping) + charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, + encoding=encoding) self._jwrite.csv(path) @since(1.5) http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 90bea2d..b9fa43f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -629,6 +629,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * enclosed in quotes. Default is to only escape values containing a quote character.</li> * <li>`header` (default `false`): writes the names of columns as the first line.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value.</li> + * <li>`encoding` (by default it is not set): specifies encoding (charset) of saved csv + * files. If it is not set, the UTF-8 charset will be used.</li> * <li>`compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`). </li> http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index aeb40e5..d59b982 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.csv +import java.nio.charset.Charset + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ @@ -168,7 +170,9 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) + private val charset = Charset.forName(params.charset) + + private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) private val gen = new UnivocityGenerator(dataSchema, writer, params) http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 63cc598..456b453 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.File -import java.nio.charset.UnsupportedCharsetException +import java.nio.charset.{Charset, UnsupportedCharsetException} +import java.nio.file.Files import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale import scala.collection.JavaConverters._ +import scala.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType @@ -514,6 +516,41 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } + test("SPARK-19018: Save csv with custom charset") { + + // scalastyle:off nonascii + val content = "µà áâä ÃÃÃ" + // scalastyle:on nonascii + + Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding => + withTempPath { path => + val csvDir = new File(path, "csv") + Seq(content).toDF().write + .option("encoding", encoding) + .csv(csvDir.getCanonicalPath) + + csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile => + val readback = Files.readAllBytes(csvFile.toPath) + val expected = (content + Properties.lineSeparator).getBytes(Charset.forName(encoding)) + assert(readback === expected) + }) + } + } + } + + test("SPARK-19018: error handling for unsupported charsets") { + val exception = intercept[SparkException] { + withTempPath { path => + val csvDir = new File(path, "csv").getCanonicalPath + Seq("a,A,c,A,b,B").toDF().write + .option("encoding", "1-9588-osi") + .csv(csvDir) + } + } + + assert(exception.getCause.getMessage.contains("1-9588-osi")) + } + test("commented lines in CSV data") { Seq("false", "true").foreach { multiLine => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org