Repository: spark Updated Branches: refs/heads/master ea5c38fe7 -> 5af5a0216
[SPARK-12872][SQL] Support to specify the option for compression codec for JSON datasource https://issues.apache.org/jira/browse/SPARK-12872 This PR makes the JSON datasource can compress output by option instead of manually setting Hadoop configurations. For reflecting codec by names, it is similar with https://github.com/apache/spark/pull/10805. As `CSVCompressionCodecs` can be shared with other datasources, it became a separate class to share as `CompressionCodecs`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #10858 from HyukjinKwon/SPARK-12872. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5af5a021 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5af5a021 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5af5a021 Branch: refs/heads/master Commit: 5af5a02160b42115579003b749c4d1831bf9d48e Parents: ea5c38f Author: hyukjinkwon <gurwls...@gmail.com> Authored: Fri Jan 22 23:53:12 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Jan 22 23:53:12 2016 -0800 ---------------------------------------------------------------------- .../datasources/CompressionCodecs.scala | 47 ++++++++++++++++++++ .../datasources/csv/CSVParameters.scala | 28 +----------- .../datasources/json/JSONOptions.scala | 12 +++-- .../datasources/json/JSONRelation.scala | 10 +++++ .../execution/datasources/json/JsonSuite.scala | 28 ++++++++++++ 5 files changed, 96 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala new file mode 100644 index 0000000..e683a95 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec} + +import org.apache.spark.util.Utils + +private[datasources] object CompressionCodecs { + private val shortCompressionCodecNames = Map( + "bzip2" -> classOf[BZip2Codec].getName, + "gzip" -> classOf[GzipCodec].getName, + "lz4" -> classOf[Lz4Codec].getName, + "snappy" -> classOf[SnappyCodec].getName) + + /** + * Return the full version of the given codec class. + * If it is already a class name, just return it. + */ + def getCodecClassName(name: String): String = { + val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) + try { + // Validate the codec name + Utils.classForName(codecName) + codecName + } catch { + case e: ClassNotFoundException => + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala index 676a3d3..0278675 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala @@ -22,6 +22,7 @@ import java.nio.charset.Charset import org.apache.hadoop.io.compress._ import org.apache.spark.Logging +import org.apache.spark.sql.execution.datasources.CompressionCodecs import org.apache.spark.util.Utils private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging { @@ -78,7 +79,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String] val compressionCodec: Option[String] = { val name = parameters.get("compression").orElse(parameters.get("codec")) - name.map(CSVCompressionCodecs.getCodecClassName) + name.map(CompressionCodecs.getCodecClassName) } val maxColumns = 20480 @@ -114,28 +115,3 @@ private[csv] object ParseModes { true // We default to permissive is the mode string is not valid } } - -private[csv] object CSVCompressionCodecs { - private val shortCompressionCodecNames = Map( - "bzip2" -> classOf[BZip2Codec].getName, - "gzip" -> classOf[GzipCodec].getName, - "lz4" -> classOf[Lz4Codec].getName, - "snappy" -> classOf[SnappyCodec].getName) - - /** - * Return the full version of the given codec class. - * If it is already a class name, just return it. - */ - def getCodecClassName(name: String): String = { - val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) - try { - // Validate the codec name - Utils.classForName(codecName) - codecName - } catch { - case e: ClassNotFoundException => - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index aee9cf2..e74a76c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.json import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import org.apache.spark.sql.execution.datasources.CompressionCodecs + /** * Options for the JSON data source. * @@ -32,7 +34,8 @@ case class JSONOptions( allowSingleQuotes: Boolean = true, allowNumericLeadingZeros: Boolean = false, allowNonNumericNumbers: Boolean = false, - allowBackslashEscapingAnyCharacter: Boolean = false) { + allowBackslashEscapingAnyCharacter: Boolean = false, + compressionCodec: Option[String] = None) { /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { @@ -46,7 +49,6 @@ case class JSONOptions( } } - object JSONOptions { def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions( samplingRatio = @@ -64,6 +66,10 @@ object JSONOptions { allowNonNumericNumbers = parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true), allowBackslashEscapingAnyCharacter = - parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) + parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false), + compressionCodec = { + val name = parameters.get("compression").orElse(parameters.get("codec")) + name.map(CompressionCodecs.getCodecClassName) + } ) } http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 31c5620..93727ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonFactory import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} +import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.{JobConf, TextInputFormat} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat @@ -162,6 +163,15 @@ private[sql] class JSONRelation( } override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { + val conf = job.getConfiguration + options.compressionCodec.foreach { codec => + conf.set("mapreduce.output.fileoutputformat.compress", "true") + conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) + conf.set("mapreduce.map.output.compress", "true") + conf.set("mapreduce.map.output.compress.codec", codec) + } + new BucketedOutputWriterFactory { override def newInstance( path: String, http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a3c6a1d..d22fa79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1467,6 +1467,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("SPARK-12872 Support to specify the option for compression codec") { + withTempDir { dir => + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val jsonDF = sqlContext.read.json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write + .format("json") + .option("compression", "gZiP") + .save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + + val jsonCopy = sqlContext.read + .format("json") + .load(jsonDir) + + assert(jsonCopy.count == jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) + } + } + test("Casting long as timestamp") { withTempTable("jsonTable") { val schema = (new StructType).add("ts", TimestampType) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org