Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1390eca2c -> d3110d8b9


[SPARK-15267][SQL] Refactor options for JDBC and ORC data sources and change 
default compression for ORC

## What changes were proposed in this pull request?

Currently, Parquet, JSON and CSV data sources have a class for thier options, 
(`ParquetOptions`, `JSONOptions` and `CSVOptions`).

It is convenient to manage options for sources to gather options into a class. 
Currently, `JDBC`, `Text`, `libsvm` and `ORC` datasources do not have this 
class. This might be nicer if these options are in a unified format so that 
options can be added and

This PR refactors the options in Spark internal data sources adding new 
classes, `OrcOptions`, `TextOptions`, `JDBCOptions` and `LibSVMOptions`.

Also, this PR change the default compression codec for ORC from `NONE` to 
`SNAPPY`.

## How was this patch tested?

Existing tests should cover this for refactoring and unittests in 
`OrcHadoopFsRelationSuite` for changing the default compression codec for ORC.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #13048 from HyukjinKwon/SPARK-15267.

(cherry picked from commit 3ded5bc4db2badc9ff49554e73421021d854306b)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3110d8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3110d8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3110d8b

Branch: refs/heads/branch-2.0
Commit: d3110d8b943b1af2cd44a6408036fc93de1d1aa9
Parents: 1390eca
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Fri May 13 09:04:37 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri May 13 09:04:45 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala |  2 +-
 .../datasources/jdbc/DefaultSource.scala        | 26 +++++-----
 .../datasources/jdbc/JDBCOptions.scala          | 39 +++++++++++++++
 .../datasources/parquet/ParquetOptions.scala    |  7 ++-
 .../apache/spark/sql/hive/orc/OrcOptions.scala  | 52 ++++++++++++++++++++
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 46 +++++------------
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 18 +++++--
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  8 +--
 8 files changed, 135 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 68a855c..39bdd1a 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -32,7 +32,7 @@ import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
JoinedRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
index 6ff50a3..6609e5d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
@@ -30,30 +30,26 @@ class DefaultSource extends RelationProvider with 
DataSourceRegister {
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
-    val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
-    val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' 
not specified"))
-    val partitionColumn = parameters.getOrElse("partitionColumn", null)
-    val lowerBound = parameters.getOrElse("lowerBound", null)
-    val upperBound = parameters.getOrElse("upperBound", null)
-    val numPartitions = parameters.getOrElse("numPartitions", null)
-
-    if (partitionColumn != null
-      && (lowerBound == null || upperBound == null || numPartitions == null)) {
+    val jdbcOptions = new JDBCOptions(parameters)
+    if (jdbcOptions.partitionColumn != null
+      && (jdbcOptions.lowerBound == null
+        || jdbcOptions.upperBound == null
+        || jdbcOptions.numPartitions == null)) {
       sys.error("Partitioning incompletely specified")
     }
 
-    val partitionInfo = if (partitionColumn == null) {
+    val partitionInfo = if (jdbcOptions.partitionColumn == null) {
       null
     } else {
       JDBCPartitioningInfo(
-        partitionColumn,
-        lowerBound.toLong,
-        upperBound.toLong,
-        numPartitions.toInt)
+        jdbcOptions.partitionColumn,
+        jdbcOptions.lowerBound.toLong,
+        jdbcOptions.upperBound.toLong,
+        jdbcOptions.numPartitions.toInt)
     }
     val parts = JDBCRelation.columnPartition(partitionInfo)
     val properties = new Properties() // Additional properties that we will 
pass to getConnection
     parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
-    JDBCRelation(url, table, parts, properties)(sqlContext.sparkSession)
+    JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, 
properties)(sqlContext.sparkSession)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
new file mode 100644
index 0000000..6c6ec89
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+/**
+ * Options for the JDBC data source.
+ */
+private[jdbc] class JDBCOptions(
+    @transient private val parameters: Map[String, String])
+  extends Serializable {
+
+  // a JDBC URL
+  val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+  // name of table
+  val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not 
specified"))
+  // the column used to partition
+  val partitionColumn = parameters.getOrElse("partitionColumn", null)
+  // the lower bound of partition column
+  val lowerBound = parameters.getOrElse("lowerBound", null)
+  // the upper bound of the partition column
+  val upperBound = parameters.getOrElse("upperBound", null)
+  // the number of partitions
+  val numPartitions = parameters.getOrElse("numPartitions", null)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 00352f2..1ff217c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -19,16 +19,15 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Options for the Parquet data source.
  */
-class ParquetOptions(
+private[parquet] class ParquetOptions(
     @transient private val parameters: Map[String, String],
     @transient private val sqlConf: SQLConf)
-  extends Logging with Serializable {
+  extends Serializable {
 
   import ParquetOptions._
 
@@ -48,7 +47,7 @@ class ParquetOptions(
 }
 
 
-object ParquetOptions {
+private[parquet] object ParquetOptions {
   // The parquet compression short names
   private val shortParquetCompressionCodecNames = Map(
     "none" -> CompressionCodecName.UNCOMPRESSED,

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
new file mode 100644
index 0000000..91cf0dc
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+/**
+ * Options for the ORC data source.
+ */
+private[orc] class OrcOptions(
+    @transient private val parameters: Map[String, String])
+  extends Serializable {
+
+  import OrcOptions._
+
+  /**
+   * Compression codec to use. By default snappy compression.
+   * Acceptable values are defined in [[shortOrcCompressionCodecNames]].
+   */
+  val compressionCodec: String = {
+    val codecName = parameters.getOrElse("compression", "snappy").toLowerCase
+    if (!shortOrcCompressionCodecNames.contains(codecName)) {
+      val availableCodecs = 
shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
+      throw new IllegalArgumentException(s"Codec [$codecName] " +
+        s"is not available. Available codecs are ${availableCodecs.mkString(", 
")}.")
+    }
+    shortOrcCompressionCodecNames(codecName)
+  }
+}
+
+private[orc] object OrcOptions {
+  // The ORC compression short names
+  private val shortOrcCompressionCodecNames = Map(
+    "none" -> "NONE",
+    "uncompressed" -> "NONE",
+    "snappy" -> "SNAPPY",
+    "zlib" -> "ZLIB",
+    "lzo" -> "LZO")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index fed3150..6e55137 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.io.orc._
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
 import 
org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, 
StructObjectInspector}
 import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
 import org.apache.hadoop.io.{NullWritable, Writable}
@@ -37,7 +36,6 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
@@ -66,28 +64,12 @@ private[sql] class DefaultSource
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    val compressionCodec: Option[String] = options
-        .get("compression")
-        .map { codecName =>
-          // Validate if given compression codec is supported or not.
-          val shortOrcCompressionCodecNames = 
OrcRelation.shortOrcCompressionCodecNames
-          if (!shortOrcCompressionCodecNames.contains(codecName.toLowerCase)) {
-            val availableCodecs = 
shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
-            throw new IllegalArgumentException(s"Codec [$codecName] " +
-                s"is not available. Available codecs are 
${availableCodecs.mkString(", ")}.")
-          }
-          codecName.toLowerCase
-        }
+    val orcOptions = new OrcOptions(options)
 
-    compressionCodec.foreach { codecName =>
-      job.getConfiguration.set(
-        OrcTableProperties.COMPRESSION.getPropName,
-        OrcRelation
-          .shortOrcCompressionCodecNames
-          .getOrElse(codecName, CompressionKind.NONE).name())
-    }
+    val configuration = job.getConfiguration
 
-    job.getConfiguration match {
+    configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec)
+    configuration match {
       case conf: JobConf =>
         conf.setOutputFormat(classOf[OrcOutputFormat])
       case conf =>
@@ -205,7 +187,7 @@ private[orc] class OrcOutputWriter(
     val partition = taskAttemptId.getTaskID.getId
     val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
     val compressionExtension = {
-      val name = conf.get(OrcTableProperties.COMPRESSION.getPropName)
+      val name = conf.get(OrcRelation.ORC_COMPRESSION)
       OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
     }
     // It has the `.orc` extension at the end because (de)compression tools
@@ -329,21 +311,15 @@ private[orc] object OrcTableScan {
 }
 
 private[orc] object OrcRelation extends HiveInspectors {
-  // The ORC compression short names
-  val shortOrcCompressionCodecNames = Map(
-    "none" -> CompressionKind.NONE,
-    "uncompressed" -> CompressionKind.NONE,
-    "snappy" -> CompressionKind.SNAPPY,
-    "zlib" -> CompressionKind.ZLIB,
-    "lzo" -> CompressionKind.LZO)
+  // The references of Hive's classes will be minimized.
+  val ORC_COMPRESSION = "orc.compress"
 
   // The extensions for ORC compression codecs
   val extensionsForCompressionCodecNames = Map(
-    CompressionKind.NONE.name -> "",
-    CompressionKind.SNAPPY.name -> ".snappy",
-    CompressionKind.ZLIB.name -> ".zlib",
-    CompressionKind.LZO.name -> ".lzo"
-  )
+    "NONE" -> "",
+    "SNAPPY" -> ".snappy",
+    "ZLIB" -> ".zlib",
+    "LZO" -> ".lzo")
 
   def unwrapOrcStructs(
       conf: Configuration,

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 965680f..0207b4e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.orc
 import java.io.File
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.ql.io.orc.{CompressionKind, OrcFile}
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.Row
@@ -98,9 +97,10 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
       val fs = FileSystem.getLocal(conf)
       val maybeOrcFile = new 
File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
       assert(maybeOrcFile.isDefined)
-      val orcFilePath = new Path(maybeOrcFile.get.toPath.toString)
-      val orcReader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf))
-      assert(orcReader.getCompression == CompressionKind.ZLIB)
+      val orcFilePath = maybeOrcFile.get.toPath.toString
+      val expectedCompressionKind =
+        OrcFileOperator.getFileReader(orcFilePath).get.getCompression
+      assert("ZLIB" === expectedCompressionKind.name())
 
       val copyDf = spark
         .read
@@ -108,4 +108,14 @@ class OrcHadoopFsRelationSuite extends 
HadoopFsRelationTest {
       checkAnswer(df, copyDf)
     }
   }
+
+  test("Default compression codec is snappy for ORC compression") {
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .orc(file.getCanonicalPath)
+      val expectedCompressionKind =
+        OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
+      assert("SNAPPY" === expectedCompressionKind.name())
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d3110d8b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 084546f..9a08858 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -171,7 +171,7 @@ class OrcQuerySuite extends QueryTest with 
BeforeAndAfterAll with OrcTest {
   test("Compression options for writing to an ORC file (SNAPPY, ZLIB and 
NONE)") {
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("orc.compress", "ZLIB")
+        .option("compression", "ZLIB")
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -180,7 +180,7 @@ class OrcQuerySuite extends QueryTest with 
BeforeAndAfterAll with OrcTest {
 
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("orc.compress", "SNAPPY")
+        .option("compression", "SNAPPY")
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -189,7 +189,7 @@ class OrcQuerySuite extends QueryTest with 
BeforeAndAfterAll with OrcTest {
 
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("orc.compress", "NONE")
+        .option("compression", "NONE")
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -201,7 +201,7 @@ class OrcQuerySuite extends QueryTest with 
BeforeAndAfterAll with OrcTest {
   ignore("LZO compression options for writing to an ORC file not supported in 
Hive 1.2.1") {
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("orc.compress", "LZO")
+        .option("compression", "LZO")
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to