Repository: spark
Updated Branches:
  refs/heads/master 790646125 -> aa0eba2c3


[SPARK-13766][SQL] Consistent file extensions for files written by internal 
data sources

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13766
This PR makes the file extensions (written by internal datasource) consistent.

**Before**

- TEXT, CSV and JSON
```
[.COMPRESSION_CODEC_NAME]
```

- Parquet
```
[.COMPRESSION_CODEC_NAME].parquet
```

- ORC
```
.orc
```

**After**

- TEXT, CSV and JSON
```
.txt[.COMPRESSION_CODEC_NAME]
.csv[.COMPRESSION_CODEC_NAME]
.json[.COMPRESSION_CODEC_NAME]
```

- Parquet
```
[.COMPRESSION_CODEC_NAME].parquet
```

- ORC
```
[.COMPRESSION_CODEC_NAME].orc
```

When the compression codec is set,
- For Parquet and ORC, each still stays in Parquet and ORC format but just have 
compressed data internally. So, I think it is okay to name `.parquet` and 
`.orc` at the end.

- For Text, CSV and JSON, each does not stays in each format but it has 
different data format according to compression codec. So, each has the names 
`.json`, `.csv` and `.txt` before the compression extension.

## How was this patch tested?

Unit tests are used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #11604 from HyukjinKwon/SPARK-13766.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa0eba2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa0eba2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa0eba2c

Branch: refs/heads/master
Commit: aa0eba2c354dc57dd83a427daa68d6171f292a83
Parents: 7906461
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Mar 9 19:12:46 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Mar 9 19:12:46 2016 -0800

----------------------------------------------------------------------
 .../execution/datasources/csv/CSVRelation.scala    |  2 +-
 .../execution/datasources/json/JSONRelation.scala  |  2 +-
 .../datasources/parquet/ParquetRelation.scala      |  3 +++
 .../execution/datasources/text/DefaultSource.scala |  2 +-
 .../sql/execution/datasources/csv/CSVSuite.scala   |  4 ++--
 .../sql/execution/datasources/json/JsonSuite.scala |  4 ++--
 .../sql/execution/datasources/text/TextSuite.scala |  4 ++--
 .../apache/spark/sql/hive/orc/OrcRelation.scala    | 17 ++++++++++++++++-
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala    |  2 +-
 9 files changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index d7ce9a0..0e6b985 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -140,7 +140,7 @@ private[sql] class CsvOutputWriter(
         val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 497e3c5..05b44d1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -167,7 +167,7 @@ private[json] class JsonOutputWriter(
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-        new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
+        new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 82404b8..f106007 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -379,6 +379,9 @@ private[sql] class ParquetOutputWriter(
           val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
           val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+          // It has the `.parquet` extension at the end because 
(de)compression tools
+          // such as gunzip would not be able to decompress this as the 
compression
+          // is not applied on this whole file but on each "page" in Parquet 
format.
           new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index b329725..2869a6a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -140,7 +140,7 @@ class TextOutputWriter(path: String, dataSchema: 
StructType, context: TaskAttemp
         val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 076fe5e..680759d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -404,7 +404,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
         .save(csvDir)
 
       val compressedFiles = new File(csvDir).listFiles()
-      assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+      assert(compressedFiles.exists(_.getName.endsWith(".csv.gz")))
 
       val carsCopy = sqlContext.read
         .format("csv")
@@ -439,7 +439,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
           .save(csvDir)
 
         val compressedFiles = new File(csvDir).listFiles()
-        assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
+        assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz")))
 
         val carsCopy = sqlContext.read
           .format("csv")

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 02b173d..097ece3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1441,7 +1441,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
         .save(jsonDir)
 
       val compressedFiles = new File(jsonDir).listFiles()
-      assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+      assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
 
       val jsonCopy = sqlContext.read
         .format("json")
@@ -1479,7 +1479,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
           .save(jsonDir)
 
         val compressedFiles = new File(jsonDir).listFiles()
-        assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
+        assert(compressedFiles.exists(!_.getName.endsWith(".json.gz")))
 
         val jsonCopy = sqlContext.read
           .format("json")

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 9eb1016..ee39872 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -74,7 +74,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         val tempDirPath = tempDir.getAbsolutePath
         testDf.write.option("compression", 
codecName).mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
-        assert(compressedFiles.exists(_.getName.endsWith(extension)))
+        assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
         verifyFrame(sqlContext.read.text(tempDirPath))
     }
 
@@ -102,7 +102,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         val tempDirPath = tempDir.getAbsolutePath
         testDf.write.option("compression", 
"none").mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
-        assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
+        assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
         verifyFrame(sqlContext.read.text(tempDirPath))
       } finally {
         // Hadoop 1 doesn't have `Configuration.unset`

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 041e0fb..8a39d95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -161,7 +161,14 @@ private[orc] class OrcOutputWriter(
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
     val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-    val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc"
+    val compressionExtension = {
+      val name = conf.get(OrcTableProperties.COMPRESSION.getPropName)
+      OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
+    }
+    // It has the `.orc` extension at the end because (de)compression tools
+    // such as gunzip would not be able to decompress this as the compression
+    // is not applied on this whole file but on each "stream" in ORC format.
+    val filename = 
f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
 
     new OrcOutputFormat().getRecordWriter(
       new Path(path, filename).getFileSystem(conf),
@@ -328,5 +335,13 @@ private[orc] object OrcRelation {
     "snappy" -> CompressionKind.SNAPPY,
     "zlib" -> CompressionKind.ZLIB,
     "lzo" -> CompressionKind.LZO)
+
+  // The extensions for ORC compression codecs
+  val extensionsForCompressionCodecNames = Map(
+    CompressionKind.NONE.name -> "",
+    CompressionKind.SNAPPY.name -> ".snappy",
+    CompressionKind.ZLIB.name -> ".zlib",
+    CompressionKind.LZO.name -> ".lzo"
+  )
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 823b52a..2345c1c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -96,7 +96,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
       // Check if this is compressed as ZLIB.
       val conf = sparkContext.hadoopConfiguration
       val fs = FileSystem.getLocal(conf)
-      val maybeOrcFile = new 
File(path).listFiles().find(_.getName.endsWith(".orc"))
+      val maybeOrcFile = new 
File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
       assert(maybeOrcFile.isDefined)
       val orcFilePath = new Path(maybeOrcFile.get.toPath.toString)
       val orcReader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to