Repository: spark Updated Branches: refs/heads/master 26ac60806 -> 9812a24aa
[SPARK-13503][SQL] Support to specify the (writing) option for compression codec for TEXT ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13503 This PR makes the TEXT datasource can compress output by option instead of manually setting Hadoop configurations. For reflecting codec by names, it is similar with https://github.com/apache/spark/pull/10805 and https://github.com/apache/spark/pull/10858. ## How was this patch tested? This was tested with unittests and with `dev/run_tests` for coding style Author: hyukjinkwon <gurwls...@gmail.com> Closes #11384 from HyukjinKwon/SPARK-13503. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9812a24a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9812a24a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9812a24a Branch: refs/heads/master Commit: 9812a24aa80713cd7a8d89f6abe5aadf1e567cc2 Parents: 26ac608 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Thu Feb 25 23:57:29 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Feb 25 23:57:29 2016 -0800 ---------------------------------------------------------------------- .../datasources/CompressionCodecs.scala | 14 ++++++++ .../execution/datasources/csv/CSVRelation.scala | 36 +++++++++----------- .../datasources/json/JSONRelation.scala | 6 +--- .../datasources/text/DefaultSource.scala | 13 +++++-- .../execution/datasources/text/TextSuite.scala | 15 ++++++++ 5 files changed, 57 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index e683a95..bc8ef4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec} import org.apache.spark.util.Utils @@ -44,4 +46,16 @@ private[datasources] object CompressionCodecs { s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") } } + + /** + * Set compression configurations to Hadoop `Configuration`. + * `codec` should be a full class path + */ + def setCodecConfiguration(conf: Configuration, codec: String): Unit = { + conf.set("mapreduce.output.fileoutputformat.compress", "true") + conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) + conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) + conf.set("mapreduce.map.output.compress", "true") + conf.set("mapreduce.map.output.compress.codec", codec) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index da945c4..e9afee1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -24,7 +24,6 @@ import scala.util.control.NonFatal import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} -import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.RecordWriter @@ -34,6 +33,7 @@ import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.CompressionCodecs import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -50,16 +50,16 @@ private[sql] class CSVRelation( case None => inferSchema(paths) } - private val params = new CSVOptions(parameters) + private val options = new CSVOptions(parameters) @transient private var cachedRDD: Option[RDD[String]] = None private def readText(location: String): RDD[String] = { - if (Charset.forName(params.charset) == Charset.forName("UTF-8")) { + if (Charset.forName(options.charset) == Charset.forName("UTF-8")) { sqlContext.sparkContext.textFile(location) } else { - val charset = params.charset + val charset = options.charset sqlContext.sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](location) .mapPartitions { _.map { pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset) @@ -81,8 +81,8 @@ private[sql] class CSVRelation( private def tokenRdd(header: Array[String], inputPaths: Array[String]): RDD[Array[String]] = { val rdd = baseRdd(inputPaths) // Make sure firstLine is materialized before sending to executors - val firstLine = if (params.headerFlag) findFirstLine(rdd) else null - CSVRelation.univocityTokenizer(rdd, header, firstLine, params) + val firstLine = if (options.headerFlag) findFirstLine(rdd) else null + CSVRelation.univocityTokenizer(rdd, header, firstLine, options) } /** @@ -96,20 +96,16 @@ private[sql] class CSVRelation( val pathsString = inputs.map(_.getPath.toUri.toString) val header = schema.fields.map(_.name) val tokenizedRdd = tokenRdd(header, pathsString) - CSVRelation.parseCsv(tokenizedRdd, schema, requiredColumns, inputs, sqlContext, params) + CSVRelation.parseCsv(tokenizedRdd, schema, requiredColumns, inputs, sqlContext, options) } override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = job.getConfiguration - params.compressionCodec.foreach { codec => - conf.set("mapreduce.output.fileoutputformat.compress", "true") - conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) - conf.set("mapreduce.map.output.compress", "true") - conf.set("mapreduce.map.output.compress.codec", codec) + options.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) } - new CSVOutputWriterFactory(params) + new CSVOutputWriterFactory(options) } override def hashCode(): Int = Objects.hashCode(paths.toSet, dataSchema, schema, partitionColumns) @@ -129,17 +125,17 @@ private[sql] class CSVRelation( private def inferSchema(paths: Array[String]): StructType = { val rdd = baseRdd(paths) val firstLine = findFirstLine(rdd) - val firstRow = new LineCsvReader(params).parseLine(firstLine) + val firstRow = new LineCsvReader(options).parseLine(firstLine) - val header = if (params.headerFlag) { + val header = if (options.headerFlag) { firstRow } else { firstRow.zipWithIndex.map { case (value, index) => s"C$index" } } val parsedRdd = tokenRdd(header, paths) - if (params.inferSchemaFlag) { - CSVInferSchema.infer(parsedRdd, header, params.nullValue) + if (options.inferSchemaFlag) { + CSVInferSchema.infer(parsedRdd, header, options.nullValue) } else { // By default fields are assumed to be StringType val schemaFields = header.map { fieldName => @@ -153,8 +149,8 @@ private[sql] class CSVRelation( * Returns the first line of the first non-empty file in path */ private def findFirstLine(rdd: RDD[String]): String = { - if (params.isCommentSet) { - val comment = params.comment.toString + if (options.isCommentSet) { + val comment = options.comment.toString rdd.filter { line => line.trim.nonEmpty && !line.startsWith(comment) }.first() http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index c893558..2813691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -165,11 +165,7 @@ private[sql] class JSONRelation( override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { val conf = job.getConfiguration options.compressionCodec.foreach { codec => - conf.set("mapreduce.output.fileoutputformat.compress", "true") - conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) - conf.set("mapreduce.map.output.compress", "true") - conf.set("mapreduce.map.output.compress.codec", codec) + CompressionCodecs.setCodecConfiguration(conf, codec) } new BucketedOutputWriterFactory { http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 430257f..60155b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} -import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, PartitionSpec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -48,7 +48,7 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { partitionColumns: Option[StructType], parameters: Map[String, String]): HadoopFsRelation = { dataSchema.foreach(verifySchema) - new TextRelation(None, dataSchema, partitionColumns, paths)(sqlContext) + new TextRelation(None, dataSchema, partitionColumns, paths, parameters)(sqlContext) } override def shortName(): String = "text" @@ -114,6 +114,15 @@ private[sql] class TextRelation( /** Write path. */ override def prepareJobForWrite(job: Job): OutputWriterFactory = { + val conf = job.getConfiguration + val compressionCodec = { + val name = parameters.get("compression").orElse(parameters.get("codec")) + name.map(CompressionCodecs.getCodecClassName) + } + compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) + } + new OutputWriterFactory { override def newInstance( path: String, http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f952725..6ae42a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -57,6 +57,21 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-13503 Support to specify the option for compression codec for TEXT") { + val df = sqlContext.read.text(testFile).withColumnRenamed("value", "adwrasdf") + + val tempFile = Utils.createTempDir() + tempFile.delete() + df.write + .option("compression", "gZiP") + .text(tempFile.getCanonicalPath) + val compressedFiles = tempFile.listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) + + Utils.deleteRecursively(tempFile) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org