Repository: spark Updated Branches: refs/heads/master d77c4e6e2 -> 7580f3041
[SPARK-16104] [SQL] Do not creaate CSV writer object for every flush when writing ## What changes were proposed in this pull request? This PR let `CsvWriter` object is not created for each time but able to be reused. This way was taken after from JSON data source. Original `CsvWriter` was being created for each row but it was enhanced in https://github.com/apache/spark/pull/13229. However, it still creates `CsvWriter` object for each `flush()` in `LineCsvWriter`. It seems it does not have to close the object and re-create this for every flush. It follows the original logic as it is but `CsvWriter` is reused by reseting `CharArrayWriter`. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls...@gmail.com> Closes #13809 from HyukjinKwon/write-perf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7580f304 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7580f304 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7580f304 Branch: refs/heads/master Commit: 7580f3041a1a3757a0b14b9d8afeb720f261fff6 Parents: d77c4e6 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Tue Jun 21 21:58:38 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Tue Jun 21 21:58:38 2016 -0700 ---------------------------------------------------------------------- .../execution/datasources/csv/CSVParser.scala | 20 +++++++++----------- .../execution/datasources/csv/CSVRelation.scala | 1 + 2 files changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7580f304/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index b06f123..2103262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader} -import java.nio.charset.StandardCharsets +import java.io.{CharArrayWriter, StringReader} import com.univocity.parsers.csv._ @@ -77,10 +76,8 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten writerSettings.setHeaders(headers: _*) writerSettings.setQuoteEscapingEnabled(params.escapeQuotes) - private var buffer = new ByteArrayOutputStream() - private var writer = new CsvWriter( - new OutputStreamWriter(buffer, StandardCharsets.UTF_8), - writerSettings) + private val buffer = new CharArrayWriter() + private val writer = new CsvWriter(buffer, writerSettings) def writeRow(row: Seq[String], includeHeader: Boolean): Unit = { if (includeHeader) { @@ -90,14 +87,15 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten } def flush(): String = { - writer.close() + writer.flush() val lines = buffer.toString.stripLineEnd - buffer = new ByteArrayOutputStream() - writer = new CsvWriter( - new OutputStreamWriter(buffer, StandardCharsets.UTF_8), - writerSettings) + buffer.reset() lines } + + def close(): Unit = { + writer.close() + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/7580f304/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 083ac33..e8c0134 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -223,6 +223,7 @@ private[sql] class CsvOutputWriter( override def close(): Unit = { flush() + csvWriter.close() recordWriter.close(context) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org