Repository: spark
Updated Branches:
  refs/heads/branch-1.5 3d247672b -> 92e8acc98


[SPARK-6923] [SPARK-7550] [SQL] Persists data source relations in Hive 
compatible format when possible

This PR is a fork of PR #5733 authored by chenghao-intel.  For committers who's 
going to merge this PR, please set the author to "Cheng Hao 
<hao.chengintel.com>".

----

When a data source relation meets the following requirements, we persist it in 
Hive compatible format, so that other systems like Hive can access it:

1. It's a `HadoopFsRelation`
2. It has only one input path
3. It's non-partitioned
4. It's data source provider can be naturally mapped to a Hive builtin SerDe 
(e.g. ORC and Parquet)

Author: Cheng Lian <l...@databricks.com>
Author: Cheng Hao <hao.ch...@intel.com>

Closes #7967 from liancheng/spark-6923/refactoring-pr-5733 and squashes the 
following commits:

5175ee6 [Cheng Lian] Fixes an oudated comment
3870166 [Cheng Lian] Fixes build error and comments
864acee [Cheng Lian] Refactors PR #5733
3490cdc [Cheng Hao] update the scaladoc
6f57669 [Cheng Hao] write schema info to hivemetastore for data source

(cherry picked from commit 119b59053870df7be899bf5c1c0d321406af96f9)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92e8acc9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92e8acc9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92e8acc9

Branch: refs/heads/branch-1.5
Commit: 92e8acc98935f0e1a3a7889e0498b8d91be8f6f6
Parents: 3d24767
Author: Cheng Hao <hao.ch...@intel.com>
Authored: Thu Aug 6 11:13:44 2015 +0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Aug 6 13:21:48 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  |  53 +++++--
 .../org/apache/spark/sql/DataFrameWriter.scala  |   7 +
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 146 ++++++++++++++++++-
 .../org/apache/spark/sql/hive/HiveQl.scala      |  49 ++-----
 .../apache/spark/sql/hive/orc/OrcRelation.scala |   6 +-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    | 133 +++++++++++++++--
 6 files changed, 324 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92e8acc9/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index e57acec..405b5a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -20,9 +20,6 @@ package org.apache.spark.sql
 import java.io.CharArrayWriter
 import java.util.Properties
 
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.unsafe.types.UTF8String
-
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
@@ -42,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
-import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation}
+import org.apache.spark.sql.json.JacksonGenerator
 import org.apache.spark.sql.sources.HadoopFsRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
@@ -1650,8 +1647,12 @@ class DataFrame private[sql](
    * an RDD out to a parquet file, and then register that file as a table.  
This "table" can then
    * be the target of an `insertInto`.
    *
-   * Also note that while this function can persist the table metadata into 
Hive's metastore,
-   * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @group output
    * @deprecated As of 1.4.0, replaced by `write().saveAsTable(tableName)`.
    */
@@ -1669,8 +1670,12 @@ class DataFrame private[sql](
    * an RDD out to a parquet file, and then register that file as a table.  
This "table" can then
    * be the target of an `insertInto`.
    *
-   * Also note that while this function can persist the table metadata into 
Hive's metastore,
-   * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @group output
    * @deprecated As of 1.4.0, replaced by 
`write().mode(mode).saveAsTable(tableName)`.
    */
@@ -1689,8 +1694,12 @@ class DataFrame private[sql](
    * an RDD out to a parquet file, and then register that file as a table.  
This "table" can then
    * be the target of an `insertInto`.
    *
-   * Also note that while this function can persist the table metadata into 
Hive's metastore,
-   * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @group output
    * @deprecated As of 1.4.0, replaced by 
`write().format(source).saveAsTable(tableName)`.
    */
@@ -1709,8 +1718,12 @@ class DataFrame private[sql](
    * an RDD out to a parquet file, and then register that file as a table.  
This "table" can then
    * be the target of an `insertInto`.
    *
-   * Also note that while this function can persist the table metadata into 
Hive's metastore,
-   * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @group output
    * @deprecated As of 1.4.0, replaced by 
`write().mode(mode).saveAsTable(tableName)`.
    */
@@ -1728,8 +1741,12 @@ class DataFrame private[sql](
    * an RDD out to a parquet file, and then register that file as a table.  
This "table" can then
    * be the target of an `insertInto`.
    *
-   * Also note that while this function can persist the table metadata into 
Hive's metastore,
-   * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @group output
    * @deprecated As of 1.4.0, replaced by
    *            
`write().format(source).mode(mode).options(options).saveAsTable(tableName)`.
@@ -1754,8 +1771,12 @@ class DataFrame private[sql](
    * an RDD out to a parquet file, and then register that file as a table.  
This "table" can then
    * be the target of an `insertInto`.
    *
-   * Also note that while this function can persist the table metadata into 
Hive's metastore,
-   * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @group output
    * @deprecated As of 1.4.0, replaced by
    *            
`write().format(source).mode(mode).options(options).saveAsTable(tableName)`.

http://git-wip-us.apache.org/repos/asf/spark/blob/92e8acc9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7e3318c..2a4992d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
ResolvedDataSource}
 import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
+import org.apache.spark.sql.sources.HadoopFsRelation
 
 
 /**
@@ -185,6 +186,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * When `mode` is `Append`, the schema of the [[DataFrame]] need to be
    * the same as that of the existing table, and format or options will be 
ignored.
    *
+   * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] 
with a single input
+   * path, and the data source provider can be mapped to an existing Hive 
builtin SerDe (i.e. ORC
+   * and Parquet), the table is persisted in a Hive compatible format, which 
means other systems
+   * like Hive will be able to read this table. Otherwise, the table is 
persisted in a Spark SQL
+   * specific format.
+   *
    * @since 1.4.0
    */
   def saveAsTable(tableName: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/92e8acc9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6b37af9..1523ebe 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.sql.hive
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 
 import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.Warehouse
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata._
@@ -40,9 +42,59 @@ import org.apache.spark.sql.execution.datasources
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation, Partition => ParquetPartition, PartitionSpec, 
ResolvedDataSource}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
 
+private[hive] case class HiveSerDe(
+    inputFormat: Option[String] = None,
+    outputFormat: Option[String] = None,
+    serde: Option[String] = None)
+
+private[hive] object HiveSerDe {
+  /**
+   * Get the Hive SerDe information from the data source abbreviation string 
or classname.
+   *
+   * @param source Currently the source abbreviation can be one of the 
following:
+   *               SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
+   * @param hiveConf Hive Conf
+   * @return HiveSerDe associated with the specified source
+   */
+  def sourceToSerDe(source: String, hiveConf: HiveConf): Option[HiveSerDe] = {
+    val serdeMap = Map(
+      "sequencefile" ->
+        HiveSerDe(
+          inputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+          outputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
+
+      "rcfile" ->
+        HiveSerDe(
+          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
+          serde = 
Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))),
+
+      "orc" ->
+        HiveSerDe(
+          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
+          serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
+
+      "parquet" ->
+        HiveSerDe(
+          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+          serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")))
+
+    val key = source.toLowerCase match {
+      case _ if source.startsWith("org.apache.spark.sql.parquet") => "parquet"
+      case _ if source.startsWith("org.apache.spark.sql.orc") => "orc"
+      case _ => source.toLowerCase
+    }
+
+    serdeMap.get(key)
+  }
+}
+
 private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: 
HiveContext)
   extends Catalog with Logging {
 
@@ -164,15 +216,15 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       processDatabaseAndTableName(database, tableIdent.table)
     }
 
-    val tableProperties = new scala.collection.mutable.HashMap[String, String]
+    val tableProperties = new mutable.HashMap[String, String]
     tableProperties.put("spark.sql.sources.provider", provider)
 
     // Saves optional user specified schema.  Serialized JSON schema string 
may be too long to be
     // stored into a single metastore SerDe property.  In this case, we split 
the JSON string and
     // store each part as a separate SerDe property.
-    if (userSpecifiedSchema.isDefined) {
+    userSpecifiedSchema.foreach { schema =>
       val threshold = conf.schemaStringLengthThreshold
-      val schemaJsonString = userSpecifiedSchema.get.json
+      val schemaJsonString = schema.json
       // Split the JSON string.
       val parts = schemaJsonString.grouped(threshold).toSeq
       tableProperties.put("spark.sql.sources.schema.numParts", 
parts.size.toString)
@@ -194,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         // The table does not have a specified schema, which means that the 
schema will be inferred
         // when we load the table. So, we are not expecting partition columns 
and we will discover
         // partitions when we load the table. However, if there are specified 
partition columns,
-        // we simplily ignore them and provide a warning message..
+        // we simply ignore them and provide a warning message.
         logWarning(
           s"The schema and partitions of table $tableIdent will be inferred 
when it is loaded. " +
             s"Specified partition columns (${partitionColumns.mkString(",")}) 
will be ignored.")
@@ -210,7 +262,11 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       ManagedTable
     }
 
-    client.createTable(
+    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
+    val dataSource = ResolvedDataSource(
+      hive, userSpecifiedSchema, partitionColumns, provider, options)
+
+    def newSparkSQLSpecificMetastoreTable(): HiveTable = {
       HiveTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
@@ -218,7 +274,83 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         partitionColumns = metastorePartitionColumns,
         tableType = tableType,
         properties = tableProperties.toMap,
-        serdeProperties = options))
+        serdeProperties = options)
+    }
+
+    def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: 
HiveSerDe): HiveTable = {
+      def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
+        schema.map { field =>
+          HiveColumn(
+            name = field.name,
+            hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
+            comment = "")
+        }
+      }
+
+      val partitionColumns = schemaToHiveColumn(relation.partitionColumns)
+      val dataColumns = 
schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains)
+
+      HiveTable(
+        specifiedDatabase = Option(dbName),
+        name = tblName,
+        schema = dataColumns,
+        partitionColumns = partitionColumns,
+        tableType = tableType,
+        properties = tableProperties.toMap,
+        serdeProperties = options,
+        location = Some(relation.paths.head),
+        viewText = None, // TODO We need to place the SQL string here.
+        inputFormat = serde.inputFormat,
+        outputFormat = serde.outputFormat,
+        serde = serde.serde)
+    }
+
+    // TODO: Support persisting partitioned data source relations in Hive 
compatible format
+    val hiveTable = (maybeSerDe, dataSource.relation) match {
+      case (Some(serde), relation: HadoopFsRelation)
+          if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
+        logInfo {
+          "Persisting data source relation with a single input path into Hive 
metastore in Hive " +
+            s"compatible format.  Input path: ${relation.paths.head}"
+        }
+        newHiveCompatibleMetastoreTable(relation, serde)
+
+      case (Some(serde), relation: HadoopFsRelation) if 
relation.partitionColumns.nonEmpty =>
+        logWarning {
+          val paths = relation.paths.mkString(", ")
+          "Persisting partitioned data source relation into Hive metastore in 
" +
+            s"Spark SQL specific format, which is NOT compatible with Hive.  
Input path(s): " +
+            paths.mkString("\n", "\n", "")
+        }
+        newSparkSQLSpecificMetastoreTable()
+
+      case (Some(serde), relation: HadoopFsRelation) =>
+        logWarning {
+          val paths = relation.paths.mkString(", ")
+          "Persisting data source relation with multiple input paths into Hive 
metastore in " +
+            s"Spark SQL specific format, which is NOT compatible with Hive.  
Input paths: " +
+            paths.mkString("\n", "\n", "")
+        }
+        newSparkSQLSpecificMetastoreTable()
+
+      case (Some(serde), _) =>
+        logWarning {
+          s"Data source relation is not a 
${classOf[HadoopFsRelation].getSimpleName}. " +
+            "Persisting it into Hive metastore in Spark SQL specific format, " 
+
+            "which is NOT compatible with Hive."
+        }
+        newSparkSQLSpecificMetastoreTable()
+
+      case _ =>
+        logWarning {
+          s"Couldn't find corresponding Hive SerDe for data source provider 
$provider. " +
+            "Persisting data source relation into Hive metastore in Spark SQL 
specific format, " +
+            "which is NOT compatible with Hive."
+        }
+        newSparkSQLSpecificMetastoreTable()
+    }
+
+    client.createTable(hiveTable)
   }
 
   def hiveDefaultTableFilePath(tableName: String): String = {
@@ -463,7 +595,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
       case p @ CreateTableAsSelect(table, child, allowExisting) =>
-        val schema = if (table.schema.size > 0) {
+        val schema = if (table.schema.nonEmpty) {
           table.schema
         } else {
           child.output.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/92e8acc9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index f43e403..7d7b4b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
@@ -261,8 +262,8 @@ private[hive] object HiveQl extends Logging {
   /**
    * Returns the HiveConf
    */
-  private[this] def hiveConf(): HiveConf = {
-    val ss = SessionState.get() // SessionState is lazy initializaion, it can 
be null here
+  private[this] def hiveConf: HiveConf = {
+    val ss = SessionState.get() // SessionState is lazy initialization, it can 
be null here
     if (ss == null) {
       new HiveConf()
     } else {
@@ -604,38 +605,18 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         serde = None,
         viewText = None)
 
-      // default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
+      // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
       val defaultStorageType = 
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
-      // handle the default format for the storage type abbriviation
-      tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
-          tableDesc.copy(
-            inputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
-            outputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
-        } else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
-          tableDesc.copy(
-            inputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
-            outputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
-            serde = 
Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
-        } else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
-          tableDesc.copy(
-            inputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
-            outputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
-            serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
-        } else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
-          tableDesc.copy(
-            inputFormat =
-              
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
-            outputFormat =
-              
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
-            serde =
-              
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
-        } else {
-          tableDesc.copy(
-            inputFormat =
-              Option("org.apache.hadoop.mapred.TextInputFormat"),
-            outputFormat =
-              Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
-        }
+      // handle the default format for the storage type abbreviation
+      val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, 
hiveConf).getOrElse {
+        HiveSerDe(
+          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+      }
+
+      hiveSerDe.inputFormat.foreach(f => tableDesc = 
tableDesc.copy(inputFormat = Some(f)))
+      hiveSerDe.outputFormat.foreach(f => tableDesc = 
tableDesc.copy(outputFormat = Some(f)))
+      hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))
 
       children.collect {
         case list @ Token("TOK_TABCOLLIST", _) =>
@@ -908,7 +889,7 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
                 }
                 (Nil, 
Some(BaseSemanticAnalyzer.unescapeSQLString(serdeClass)), serdeProps)
 
-              case Nil => (Nil, 
Option(hiveConf().getVar(ConfVars.HIVESCRIPTSERDE)), Nil)
+              case Nil => (Nil, 
Option(hiveConf.getVar(ConfVars.HIVESCRIPTSERDE)), Nil)
             }
 
             val (inRowFormat, inSerdeClass, inSerdeProps) = 
matchSerDe(inputSerdeClause)

http://git-wip-us.apache.org/repos/asf/spark/blob/92e8acc9/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 6fa5997..4a310ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -291,9 +291,11 @@ private[orc] case class OrcTableScan(
     // Sets requested columns
     addColumnIds(attributes, relation, conf)
 
-    if (inputPaths.nonEmpty) {
-      FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
+    if (inputPaths.isEmpty) {
+      // the input path probably be pruned, return an empty RDD.
+      return sqlContext.sparkContext.emptyRDD[InternalRow]
     }
+    FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
 
     val inputFormatClass =
       classOf[OrcInputFormat]

http://git-wip-us.apache.org/repos/asf/spark/blob/92e8acc9/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 983c013..332c3ec 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,31 +17,142 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.hive.test.TestHive
+import java.io.File
 
-import org.apache.spark.sql.test.ExamplePointUDT
+import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, 
ManagedTable}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.sources.DataSourceTest
+import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.{Logging, SparkFunSuite}
+
 
 class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging {
 
   test("struct field should accept underscore in sub-column name") {
-    val metastr = "struct<a: int, b_1: string, c: string>"
-
-    val datatype = HiveMetastoreTypes.toDataType(metastr)
-    assert(datatype.isInstanceOf[StructType])
+    val hiveTypeStr = "struct<a: int, b_1: string, c: string>"
+    val dateType = HiveMetastoreTypes.toDataType(hiveTypeStr)
+    assert(dateType.isInstanceOf[StructType])
   }
 
   test("udt to metastore type conversion") {
     val udt = new ExamplePointUDT
-    assert(HiveMetastoreTypes.toMetastoreType(udt) ===
-      HiveMetastoreTypes.toMetastoreType(udt.sqlType))
+    assertResult(HiveMetastoreTypes.toMetastoreType(udt.sqlType)) {
+      HiveMetastoreTypes.toMetastoreType(udt)
+    }
   }
 
   test("duplicated metastore relations") {
-    import TestHive.implicits._
-    val df = TestHive.sql("SELECT * FROM src")
+    val df = sql("SELECT * FROM src")
     logInfo(df.queryExecution.toString)
     df.as('a).join(df.as('b), $"a.key" === $"b.key")
   }
 }
+
+class DataSourceWithHiveMetastoreCatalogSuite extends DataSourceTest with 
SQLTestUtils {
+  override val sqlContext = TestHive
+
+  private val testDF = (1 to 2).map(i => (i, s"val_$i")).toDF("d1", 
"d2").coalesce(1)
+
+  Seq(
+    "parquet" -> (
+      "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+      "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
+      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+    ),
+
+    "orc" -> (
+      "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+      "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+      "org.apache.hadoop.hive.ql.io.orc.OrcSerde"
+    )
+  ).foreach { case (provider, (inputFormat, outputFormat, serde)) =>
+    test(s"Persist non-partitioned $provider relation into metastore as 
managed table") {
+      withTable("t") {
+        testDF
+          .write
+          .mode(SaveMode.Overwrite)
+          .format(provider)
+          .saveAsTable("t")
+
+        val hiveTable = catalog.client.getTable("default", "t")
+        assert(hiveTable.inputFormat === Some(inputFormat))
+        assert(hiveTable.outputFormat === Some(outputFormat))
+        assert(hiveTable.serde === Some(serde))
+
+        assert(!hiveTable.isPartitioned)
+        assert(hiveTable.tableType === ManagedTable)
+
+        val columns = hiveTable.schema
+        assert(columns.map(_.name) === Seq("d1", "d2"))
+        assert(columns.map(_.hiveType) === Seq("int", "string"))
+
+        checkAnswer(table("t"), testDF)
+        assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1", "2\tval_2"))
+      }
+    }
+
+    test(s"Persist non-partitioned $provider relation into metastore as 
external table") {
+      withTempPath { dir =>
+        withTable("t") {
+          val path = dir.getCanonicalFile
+
+          testDF
+            .write
+            .mode(SaveMode.Overwrite)
+            .format(provider)
+            .option("path", path.toString)
+            .saveAsTable("t")
+
+          val hiveTable = catalog.client.getTable("default", "t")
+          assert(hiveTable.inputFormat === Some(inputFormat))
+          assert(hiveTable.outputFormat === Some(outputFormat))
+          assert(hiveTable.serde === Some(serde))
+
+          assert(hiveTable.tableType === ExternalTable)
+          assert(hiveTable.location.get === 
path.toURI.toString.stripSuffix(File.separator))
+
+          val columns = hiveTable.schema
+          assert(columns.map(_.name) === Seq("d1", "d2"))
+          assert(columns.map(_.hiveType) === Seq("int", "string"))
+
+          checkAnswer(table("t"), testDF)
+          assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1", "2\tval_2"))
+        }
+      }
+    }
+
+    test(s"Persist non-partitioned $provider relation into metastore as 
managed table using CTAS") {
+      withTempPath { dir =>
+        withTable("t") {
+          val path = dir.getCanonicalPath
+
+          sql(
+            s"""CREATE TABLE t USING $provider
+               |OPTIONS (path '$path')
+               |AS SELECT 1 AS d1, "val_1" AS d2
+             """.stripMargin)
+
+          val hiveTable = catalog.client.getTable("default", "t")
+          assert(hiveTable.inputFormat === Some(inputFormat))
+          assert(hiveTable.outputFormat === Some(outputFormat))
+          assert(hiveTable.serde === Some(serde))
+
+          assert(hiveTable.isPartitioned === false)
+          assert(hiveTable.tableType === ExternalTable)
+          assert(hiveTable.partitionColumns.length === 0)
+
+          val columns = hiveTable.schema
+          assert(columns.map(_.name) === Seq("d1", "d2"))
+          assert(columns.map(_.hiveType) === Seq("int", "string"))
+
+          checkAnswer(table("t"), Row(1, "val_1"))
+          assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to