Repository: spark
Updated Branches:
  refs/heads/master d77c4e6e2 -> 7580f3041


[SPARK-16104] [SQL] Do not creaate CSV writer object for every flush when 
writing

## What changes were proposed in this pull request?

This PR let `CsvWriter` object is not created for each time but able to be 
reused. This way was taken after from JSON data source.

Original `CsvWriter` was being created for each row but it was enhanced in 
https://github.com/apache/spark/pull/13229. However, it still creates 
`CsvWriter` object for each `flush()` in `LineCsvWriter`. It seems it does not 
have to close the object and re-create this for every flush.

It follows the original logic as it is but `CsvWriter` is reused by reseting 
`CharArrayWriter`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #13809 from HyukjinKwon/write-perf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7580f304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7580f304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7580f304

Branch: refs/heads/master
Commit: 7580f3041a1a3757a0b14b9d8afeb720f261fff6
Parents: d77c4e6
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Tue Jun 21 21:58:38 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Tue Jun 21 21:58:38 2016 -0700

----------------------------------------------------------------------
 .../execution/datasources/csv/CSVParser.scala   | 20 +++++++++-----------
 .../execution/datasources/csv/CSVRelation.scala |  1 +
 2 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7580f304/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
index b06f123..2103262 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
-import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader}
-import java.nio.charset.StandardCharsets
+import java.io.{CharArrayWriter, StringReader}
 
 import com.univocity.parsers.csv._
 
@@ -77,10 +76,8 @@ private[sql] class LineCsvWriter(params: CSVOptions, 
headers: Seq[String]) exten
   writerSettings.setHeaders(headers: _*)
   writerSettings.setQuoteEscapingEnabled(params.escapeQuotes)
 
-  private var buffer = new ByteArrayOutputStream()
-  private var writer = new CsvWriter(
-    new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
-    writerSettings)
+  private val buffer = new CharArrayWriter()
+  private val writer = new CsvWriter(buffer, writerSettings)
 
   def writeRow(row: Seq[String], includeHeader: Boolean): Unit = {
     if (includeHeader) {
@@ -90,14 +87,15 @@ private[sql] class LineCsvWriter(params: CSVOptions, 
headers: Seq[String]) exten
   }
 
   def flush(): String = {
-    writer.close()
+    writer.flush()
     val lines = buffer.toString.stripLineEnd
-    buffer = new ByteArrayOutputStream()
-    writer = new CsvWriter(
-      new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
-      writerSettings)
+    buffer.reset()
     lines
   }
+
+  def close(): Unit = {
+    writer.close()
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7580f304/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 083ac33..e8c0134 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -223,6 +223,7 @@ private[sql] class CsvOutputWriter(
 
   override def close(): Unit = {
     flush()
+    csvWriter.close()
     recordWriter.close(context)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to