Repository: spark Updated Branches: refs/heads/master 790646125 -> aa0eba2c3
[SPARK-13766][SQL] Consistent file extensions for files written by internal data sources ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13766 This PR makes the file extensions (written by internal datasource) consistent. **Before** - TEXT, CSV and JSON ``` [.COMPRESSION_CODEC_NAME] ``` - Parquet ``` [.COMPRESSION_CODEC_NAME].parquet ``` - ORC ``` .orc ``` **After** - TEXT, CSV and JSON ``` .txt[.COMPRESSION_CODEC_NAME] .csv[.COMPRESSION_CODEC_NAME] .json[.COMPRESSION_CODEC_NAME] ``` - Parquet ``` [.COMPRESSION_CODEC_NAME].parquet ``` - ORC ``` [.COMPRESSION_CODEC_NAME].orc ``` When the compression codec is set, - For Parquet and ORC, each still stays in Parquet and ORC format but just have compressed data internally. So, I think it is okay to name `.parquet` and `.orc` at the end. - For Text, CSV and JSON, each does not stays in each format but it has different data format according to compression codec. So, each has the names `.json`, `.csv` and `.txt` before the compression extension. ## How was this patch tested? Unit tests are used and `./dev/run_tests` for coding style tests. Author: hyukjinkwon <gurwls...@gmail.com> Closes #11604 from HyukjinKwon/SPARK-13766. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa0eba2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa0eba2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa0eba2c Branch: refs/heads/master Commit: aa0eba2c354dc57dd83a427daa68d6171f292a83 Parents: 7906461 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Wed Mar 9 19:12:46 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Mar 9 19:12:46 2016 -0800 ---------------------------------------------------------------------- .../execution/datasources/csv/CSVRelation.scala | 2 +- .../execution/datasources/json/JSONRelation.scala | 2 +- .../datasources/parquet/ParquetRelation.scala | 3 +++ .../execution/datasources/text/DefaultSource.scala | 2 +- .../sql/execution/datasources/csv/CSVSuite.scala | 4 ++-- .../sql/execution/datasources/json/JsonSuite.scala | 4 ++-- .../sql/execution/datasources/text/TextSuite.scala | 4 ++-- .../apache/spark/sql/hive/orc/OrcRelation.scala | 17 ++++++++++++++++- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 2 +- 9 files changed, 29 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index d7ce9a0..0e6b985 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -140,7 +140,7 @@ private[sql] class CsvOutputWriter( val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension") } }.getRecordWriter(context) } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 497e3c5..05b44d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -167,7 +167,7 @@ private[json] class JsonOutputWriter( val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension") } }.getRecordWriter(context) } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 82404b8..f106007 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -379,6 +379,9 @@ private[sql] class ParquetOutputWriter( val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") + // It has the `.parquet` extension at the end because (de)compression tools + // such as gunzip would not be able to decompress this as the compression + // is not applied on this whole file but on each "page" in Parquet format. new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") } } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index b329725..2869a6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -140,7 +140,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension") } }.getRecordWriter(context) } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 076fe5e..680759d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -404,7 +404,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(csvDir) val compressedFiles = new File(csvDir).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + assert(compressedFiles.exists(_.getName.endsWith(".csv.gz"))) val carsCopy = sqlContext.read .format("csv") @@ -439,7 +439,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(csvDir) val compressedFiles = new File(csvDir).listFiles() - assert(compressedFiles.exists(!_.getName.endsWith(".gz"))) + assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz"))) val carsCopy = sqlContext.read .format("csv") http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 02b173d..097ece3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1441,7 +1441,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .save(jsonDir) val compressedFiles = new File(jsonDir).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) val jsonCopy = sqlContext.read .format("json") @@ -1479,7 +1479,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .save(jsonDir) val compressedFiles = new File(jsonDir).listFiles() - assert(compressedFiles.exists(!_.getName.endsWith(".gz"))) + assert(compressedFiles.exists(!_.getName.endsWith(".json.gz"))) val jsonCopy = sqlContext.read .format("json") http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 9eb1016..ee39872 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -74,7 +74,7 @@ class TextSuite extends QueryTest with SharedSQLContext { val tempDirPath = tempDir.getAbsolutePath testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() - assert(compressedFiles.exists(_.getName.endsWith(extension))) + assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension"))) verifyFrame(sqlContext.read.text(tempDirPath)) } @@ -102,7 +102,7 @@ class TextSuite extends QueryTest with SharedSQLContext { val tempDirPath = tempDir.getAbsolutePath testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() - assert(compressedFiles.exists(!_.getName.endsWith(".gz"))) + assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) verifyFrame(sqlContext.read.text(tempDirPath)) } finally { // Hadoop 1 doesn't have `Configuration.unset` http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 041e0fb..8a39d95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -161,7 +161,14 @@ private[orc] class OrcOutputWriter( val taskAttemptId = context.getTaskAttemptID val partition = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc" + val compressionExtension = { + val name = conf.get(OrcTableProperties.COMPRESSION.getPropName) + OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") + } + // It has the `.orc` extension at the end because (de)compression tools + // such as gunzip would not be able to decompress this as the compression + // is not applied on this whole file but on each "stream" in ORC format. + val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc" new OrcOutputFormat().getRecordWriter( new Path(path, filename).getFileSystem(conf), @@ -328,5 +335,13 @@ private[orc] object OrcRelation { "snappy" -> CompressionKind.SNAPPY, "zlib" -> CompressionKind.ZLIB, "lzo" -> CompressionKind.LZO) + + // The extensions for ORC compression codecs + val extensionsForCompressionCodecNames = Map( + CompressionKind.NONE.name -> "", + CompressionKind.SNAPPY.name -> ".snappy", + CompressionKind.ZLIB.name -> ".zlib", + CompressionKind.LZO.name -> ".lzo" + ) } http://git-wip-us.apache.org/repos/asf/spark/blob/aa0eba2c/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 823b52a..2345c1c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -96,7 +96,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { // Check if this is compressed as ZLIB. val conf = sparkContext.hadoopConfiguration val fs = FileSystem.getLocal(conf) - val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".orc")) + val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) val orcFilePath = new Path(maybeOrcFile.get.toPath.toString) val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org