Repository: spark
Updated Branches:
  refs/heads/master 26ac60806 -> 9812a24aa


[SPARK-13503][SQL] Support to specify the (writing) option for compression 
codec for TEXT

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13503
This PR makes the TEXT datasource can compress output by option instead of 
manually setting Hadoop configurations.
For reflecting codec by names, it is similar with 
https://github.com/apache/spark/pull/10805 and 
https://github.com/apache/spark/pull/10858.

## How was this patch tested?

This was tested with unittests and with `dev/run_tests` for coding style

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #11384 from HyukjinKwon/SPARK-13503.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9812a24a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9812a24a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9812a24a

Branch: refs/heads/master
Commit: 9812a24aa80713cd7a8d89f6abe5aadf1e567cc2
Parents: 26ac608
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Thu Feb 25 23:57:29 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Feb 25 23:57:29 2016 -0800

----------------------------------------------------------------------
 .../datasources/CompressionCodecs.scala         | 14 ++++++++
 .../execution/datasources/csv/CSVRelation.scala | 36 +++++++++-----------
 .../datasources/json/JSONRelation.scala         |  6 +---
 .../datasources/text/DefaultSource.scala        | 13 +++++--
 .../execution/datasources/text/TextSuite.scala  | 15 ++++++++
 5 files changed, 57 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
index e683a95..bc8ef4a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, 
SnappyCodec}
 
 import org.apache.spark.util.Utils
@@ -44,4 +46,16 @@ private[datasources] object CompressionCodecs {
           s"is not available. Known codecs are 
${shortCompressionCodecNames.keys.mkString(", ")}.")
     }
   }
+
+  /**
+   * Set compression configurations to Hadoop `Configuration`.
+   * `codec` should be a full class path
+   */
+  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
+    conf.set("mapreduce.output.fileoutputformat.compress", "true")
+    conf.set("mapreduce.output.fileoutputformat.compress.type", 
CompressionType.BLOCK.toString)
+    conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+    conf.set("mapreduce.map.output.compress", "true")
+    conf.set("mapreduce.map.output.compress.codec", codec)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index da945c4..e9afee1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -24,7 +24,6 @@ import scala.util.control.NonFatal
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
-import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.RecordWriter
@@ -34,6 +33,7 @@ import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
@@ -50,16 +50,16 @@ private[sql] class CSVRelation(
     case None => inferSchema(paths)
   }
 
-  private val params = new CSVOptions(parameters)
+  private val options = new CSVOptions(parameters)
 
   @transient
   private var cachedRDD: Option[RDD[String]] = None
 
   private def readText(location: String): RDD[String] = {
-    if (Charset.forName(params.charset) == Charset.forName("UTF-8")) {
+    if (Charset.forName(options.charset) == Charset.forName("UTF-8")) {
       sqlContext.sparkContext.textFile(location)
     } else {
-      val charset = params.charset
+      val charset = options.charset
       sqlContext.sparkContext.hadoopFile[LongWritable, Text, 
TextInputFormat](location)
         .mapPartitions { _.map { pair =>
             new String(pair._2.getBytes, 0, pair._2.getLength, charset)
@@ -81,8 +81,8 @@ private[sql] class CSVRelation(
   private def tokenRdd(header: Array[String], inputPaths: Array[String]): 
RDD[Array[String]] = {
     val rdd = baseRdd(inputPaths)
     // Make sure firstLine is materialized before sending to executors
-    val firstLine = if (params.headerFlag) findFirstLine(rdd) else null
-    CSVRelation.univocityTokenizer(rdd, header, firstLine, params)
+    val firstLine = if (options.headerFlag) findFirstLine(rdd) else null
+    CSVRelation.univocityTokenizer(rdd, header, firstLine, options)
   }
 
   /**
@@ -96,20 +96,16 @@ private[sql] class CSVRelation(
     val pathsString = inputs.map(_.getPath.toUri.toString)
     val header = schema.fields.map(_.name)
     val tokenizedRdd = tokenRdd(header, pathsString)
-    CSVRelation.parseCsv(tokenizedRdd, schema, requiredColumns, inputs, 
sqlContext, params)
+    CSVRelation.parseCsv(tokenizedRdd, schema, requiredColumns, inputs, 
sqlContext, options)
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
     val conf = job.getConfiguration
-    params.compressionCodec.foreach { codec =>
-      conf.set("mapreduce.output.fileoutputformat.compress", "true")
-      conf.set("mapreduce.output.fileoutputformat.compress.type", 
CompressionType.BLOCK.toString)
-      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
-      conf.set("mapreduce.map.output.compress", "true")
-      conf.set("mapreduce.map.output.compress.codec", codec)
+    options.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
     }
 
-    new CSVOutputWriterFactory(params)
+    new CSVOutputWriterFactory(options)
   }
 
   override def hashCode(): Int = Objects.hashCode(paths.toSet, dataSchema, 
schema, partitionColumns)
@@ -129,17 +125,17 @@ private[sql] class CSVRelation(
   private def inferSchema(paths: Array[String]): StructType = {
     val rdd = baseRdd(paths)
     val firstLine = findFirstLine(rdd)
-    val firstRow = new LineCsvReader(params).parseLine(firstLine)
+    val firstRow = new LineCsvReader(options).parseLine(firstLine)
 
-    val header = if (params.headerFlag) {
+    val header = if (options.headerFlag) {
       firstRow
     } else {
       firstRow.zipWithIndex.map { case (value, index) => s"C$index" }
     }
 
     val parsedRdd = tokenRdd(header, paths)
-    if (params.inferSchemaFlag) {
-      CSVInferSchema.infer(parsedRdd, header, params.nullValue)
+    if (options.inferSchemaFlag) {
+      CSVInferSchema.infer(parsedRdd, header, options.nullValue)
     } else {
       // By default fields are assumed to be StringType
       val schemaFields = header.map { fieldName =>
@@ -153,8 +149,8 @@ private[sql] class CSVRelation(
     * Returns the first line of the first non-empty file in path
     */
   private def findFirstLine(rdd: RDD[String]): String = {
-    if (params.isCommentSet) {
-      val comment = params.comment.toString
+    if (options.isCommentSet) {
+      val comment = options.comment.toString
       rdd.filter { line =>
         line.trim.nonEmpty && !line.startsWith(comment)
       }.first()

http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index c893558..2813691 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -165,11 +165,7 @@ private[sql] class JSONRelation(
   override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
     val conf = job.getConfiguration
     options.compressionCodec.foreach { codec =>
-      conf.set("mapreduce.output.fileoutputformat.compress", "true")
-      conf.set("mapreduce.output.fileoutputformat.compress.type", 
CompressionType.BLOCK.toString)
-      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
-      conf.set("mapreduce.map.output.compress", "true")
-      conf.set("mapreduce.map.output.compress.codec", codec)
+      CompressionCodecs.setCodecConfiguration(conf, codec)
     }
 
     new BucketedOutputWriterFactory {

http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 430257f..60155b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, 
SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, 
PartitionSpec}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -48,7 +48,7 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation = {
     dataSchema.foreach(verifySchema)
-    new TextRelation(None, dataSchema, partitionColumns, paths)(sqlContext)
+    new TextRelation(None, dataSchema, partitionColumns, paths, 
parameters)(sqlContext)
   }
 
   override def shortName(): String = "text"
@@ -114,6 +114,15 @@ private[sql] class TextRelation(
 
   /** Write path. */
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val compressionCodec = {
+      val name = parameters.get("compression").orElse(parameters.get("codec"))
+      name.map(CompressionCodecs.getCodecClassName)
+    }
+    compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index f952725..6ae42a3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -57,6 +57,21 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-13503 Support to specify the option for compression codec for 
TEXT") {
+    val df = sqlContext.read.text(testFile).withColumnRenamed("value", 
"adwrasdf")
+
+    val tempFile = Utils.createTempDir()
+    tempFile.delete()
+    df.write
+      .option("compression", "gZiP")
+      .text(tempFile.getCanonicalPath)
+    val compressedFiles = tempFile.listFiles()
+    assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath))
+
+    Utils.deleteRecursively(tempFile)
+  }
+
   private def testFile: String = {
     
Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to