Repository: spark Updated Branches: refs/heads/master 031bd80e4 -> c7d95cced
[SPARK-26208][SQL] add headers to empty csv files when header=true ## What changes were proposed in this pull request? Add headers to empty csv files when header=true, because otherwise these files are invalid when reading. ## How was this patch tested? Added test for roundtrip of empty dataframe to csv file with headers and back in CSVSuite Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23173 from koertkuipers/feat-empty-csv-with-header. Authored-by: Koert Kuipers <ko...@tresata.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7d95cce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7d95cce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7d95cce Branch: refs/heads/master Commit: c7d95ccedf593edf9fda9ecaf8d0b4dda451440d Parents: 031bd80 Author: Koert Kuipers <ko...@tresata.com> Authored: Sun Dec 2 17:38:25 2018 +0800 Committer: Hyukjin Kwon <gurwls...@apache.org> Committed: Sun Dec 2 17:38:25 2018 +0800 ---------------------------------------------------------------------- .../sql/catalyst/csv/UnivocityGenerator.scala | 9 ++++---- .../datasources/csv/CSVFileFormat.scala | 22 +++++++++++++------- .../execution/datasources/csv/CSVSuite.scala | 13 ++++++++++++ 3 files changed, 31 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 1218f92..2ab376c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -32,7 +32,6 @@ class UnivocityGenerator( private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) - private var printHeader = options.headerFlag // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. // When the value is null, this converter should not be called. @@ -72,15 +71,15 @@ class UnivocityGenerator( values } + def writeHeaders(): Unit = { + gen.writeHeaders() + } + /** * Writes a single InternalRow to CSV using Univocity. */ def write(row: InternalRow): Unit = { - if (printHeader) { - gen.writeHeaders() - } gen.writeRow(convertRow(row): _*) - printHeader = false } def writeToString(row: InternalRow): String = { http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 4c5a1d3..f7d8a9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -171,15 +171,21 @@ private[csv] class CsvOutputWriter( private var univocityGenerator: Option[UnivocityGenerator] = None - override def write(row: InternalRow): Unit = { - val gen = univocityGenerator.getOrElse { - val charset = Charset.forName(params.charset) - val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - val newGen = new UnivocityGenerator(dataSchema, os, params) - univocityGenerator = Some(newGen) - newGen - } + if (params.headerFlag) { + val gen = getGen() + gen.writeHeaders() + } + private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse { + val charset = Charset.forName(params.charset) + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) + val newGen = new UnivocityGenerator(dataSchema, os, params) + univocityGenerator = Some(newGen) + newGen + } + + override def write(row: InternalRow): Unit = { + val gen = getGen() gen.write(row) } http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e14e8d4..bc950f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1987,6 +1987,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(errMsg2.contains("'lineSep' can contain only 1 character")) } + test("SPARK-26208: write and read empty data to csv file with headers") { + withTempPath { path => + val df1 = spark.range(10).repartition(2).filter(_ < 0).map(_.toString).toDF + // we have 2 partitions but they are both empty and will be filtered out upon writing + // thanks to SPARK-23271 one new empty partition will be inserted + df1.write.format("csv").option("header", true).save(path.getAbsolutePath) + val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false) + .load(path.getAbsolutePath) + assert(df1.schema === df2.schema) + checkAnswer(df1, df2) + } + } + test("do not produce empty files for empty partitions") { withTempPath { dir => val path = dir.getCanonicalPath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org