Repository: spark
Updated Branches:
  refs/heads/master f5d18af6a -> cca945b6a


[SPARK-18885][SQL] unify CREATE TABLE syntax for data source and hive serde 
tables

## What changes were proposed in this pull request?

Today we have different syntax to create data source or hive serde tables, we 
should unify them to not confuse users and step forward to make hive a data 
source.

Please read 
https://issues.apache.org/jira/secure/attachment/12843835/CREATE-TABLE.pdf for  
details.

TODO(for follow-up PRs):
1. TBLPROPERTIES is not added to the new syntax, we should decide if we wanna 
add it later.
2. `SHOW CREATE TABLE` should be updated to use the new syntax.
3. we should decide if we wanna change the behavior of `SET LOCATION`.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16296 from cloud-fan/create-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cca945b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cca945b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cca945b6

Branch: refs/heads/master
Commit: cca945b6aa679e61864c1cabae91e6ae7703362e
Parents: f5d18af
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Jan 5 17:40:27 2017 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Jan 5 17:40:27 2017 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  60 +++++++++--
 .../examples/sql/hive/JavaSparkHiveExample.java |   2 +-
 examples/src/main/python/sql/hive.py            |   2 +-
 examples/src/main/r/RSparkSQLExample.R          |   2 +-
 .../examples/sql/hive/SparkHiveExample.scala    |   2 +-
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  10 +-
 .../sql/catalyst/util/CaseInsensitiveMap.scala  |   2 +
 .../spark/sql/execution/SparkSqlParser.scala    |  57 +++++-----
 .../spark/sql/execution/SparkStrategies.scala   |   6 +-
 .../spark/sql/execution/command/ddl.scala       |   6 +-
 .../spark/sql/execution/datasources/rules.scala |   7 +-
 .../apache/spark/sql/internal/HiveSerDe.scala   |  84 +++++++++------
 .../sql/execution/SparkSqlParserSuite.scala     |   3 +-
 .../sql/execution/command/DDLCommandSuite.scala |  79 +++++++++++---
 .../spark/sql/hive/HiveExternalCatalog.scala    |   4 +-
 .../spark/sql/hive/HiveSessionState.scala       |   1 +
 .../apache/spark/sql/hive/HiveStrategies.scala  |  73 +++++++++++--
 .../spark/sql/hive/execution/HiveOptions.scala  | 102 ++++++++++++++++++
 .../spark/sql/hive/orc/OrcFileOperator.scala    |   2 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    | 107 +++++++++++++++++--
 .../sql/hive/HiveExternalCatalogSuite.scala     |   2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  15 ---
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  39 +++++++
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |   2 -
 24 files changed, 526 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4cd21ae..0f6e344 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -522,14 +522,11 @@ Hive metastore. Persistent tables will still exist even 
after your Spark program
 long as you maintain your connection to the same metastore. A DataFrame for a 
persistent table can
 be created by calling the `table` method on a `SparkSession` with the name of 
the table.
 
-By default `saveAsTable` will create a "managed table", meaning that the 
location of the data will
-be controlled by the metastore. Managed tables will also have their data 
deleted automatically
-when a table is dropped.
-
-Currently, `saveAsTable` does not expose an API supporting the creation of an 
"external table" from a `DataFrame`.
-However, this functionality can be achieved by providing a `path` option to 
the `DataFrameWriter` with `path` as the key
-and location of the external table as its value (a string) when saving the 
table with `saveAsTable`. When an External table
-is dropped only its metadata is removed.
+For file-based data source, e.g. text, parquet, json, etc. you can specify a 
custom table path via the
+`path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. 
When the table is dropped,
+the custom table path will not be removed and the table data is still there. 
If no custom table path is
+specifed, Spark will write data to a default table path under the warehouse 
directory. When the table is
+dropped, the default table path will be removed too.
 
 Starting from Spark 2.1, persistent datasource tables have per-partition 
metadata stored in the Hive metastore. This brings several benefits:
 
@@ -954,6 +951,53 @@ adds support for finding tables in the MetaStore and 
writing queries using HiveQ
 </div>
 </div>
 
+### Specifying storage format for Hive tables
+
+When you create a Hive table, you need to define how this table should 
read/write data from/to file system,
+i.e. the "input format" and "output format". You also need to define how this 
table should deserialize the data
+to rows, or serialize rows to data, i.e. the "serde". The following options 
can be used to specify the storage
+format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id 
int) USING hive OPTIONS(fileFormat 'parquet')`.
+By default, we will read the table files as plain text. Note that, Hive 
storage handler is not supported yet when
+creating table, you can create a table using storage handler at Hive side, and 
use Spark SQL to read it.
+
+<table class="table">
+  <tr><th>Property Name</th><th>Meaning</th></tr>
+  <tr>
+    <td><code>fileFormat</code></td>
+    <td>
+      A fileFormat is kind of a package of storage format specifications, 
including "serde", "input format" and
+      "output format". Currently we support 6 fileFormats: 'sequencefile', 
'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>inputFormat, outputFormat</code></td>
+    <td>
+      These 2 options specify the name of a corresponding `InputFormat` and 
`OutputFormat` class as a string literal,
+      e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options 
must be appeared in pair, and you can not
+      specify them if you already specified the `fileFormat` option.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>serde</code></td>
+    <td>
+      This option specifies the name of a serde class. When the `fileFormat` 
option is specified, do not specify this option
+      if the given `fileFormat` already include the information of serde. 
Currently "sequencefile", "textfile" and "rcfile"
+      don't include the serde information and you can use this option with 
these 3 fileFormats.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, 
lineDelim</code></td>
+    <td>
+      These options can only be used with "textfile" fileFormat. They define 
how to read delimited files into rows.
+    </td>
+  </tr>
+</table>
+
+All other properties defined with `OPTIONS` will be regarded as Hive serde 
properties.
+
 ### Interacting with Different Versions of Hive Metastore
 
 One of the most important pieces of Spark SQL's Hive support is interaction 
with Hive metastore,

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
index 052153c..8d06d38 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -64,7 +64,7 @@ public class JavaSparkHiveExample {
       .enableHiveSupport()
       .getOrCreate();
 
-    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
+    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING 
hive");
     spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' 
INTO TABLE src");
 
     // Queries are expressed in HiveQL

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/examples/src/main/python/sql/hive.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/hive.py 
b/examples/src/main/python/sql/hive.py
index ad83fe1..ba01544 100644
--- a/examples/src/main/python/sql/hive.py
+++ b/examples/src/main/python/sql/hive.py
@@ -44,7 +44,7 @@ if __name__ == "__main__":
         .getOrCreate()
 
     # spark is an existing SparkSession
-    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING 
hive")
     spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' 
INTO TABLE src")
 
     # Queries are expressed in HiveQL

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/examples/src/main/r/RSparkSQLExample.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/RSparkSQLExample.R 
b/examples/src/main/r/RSparkSQLExample.R
index 373a36d..e647f0e 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -195,7 +195,7 @@ head(teenagers)
 # $example on:spark_hive$
 # enableHiveSupport defaults to TRUE
 sparkR.session(enableHiveSupport = TRUE)
-sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
 sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE 
src")
 
 # Queries can be expressed in HiveQL.

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
index ded18da..d29ed95 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -50,7 +50,7 @@ object SparkHiveExample {
     import spark.implicits._
     import spark.sql
 
-    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
     sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO 
TABLE src")
 
     // Queries are expressed in HiveQL

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a34087c..3222a9c 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -69,16 +69,18 @@ statement
     | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList     
#setDatabaseProperties
     | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)?      
#dropDatabase
     | createTableHeader ('(' colTypeList ')')? tableProvider
-        (OPTIONS tablePropertyList)?
+        (OPTIONS options=tablePropertyList)?
         (PARTITIONED BY partitionColumnNames=identifierList)?
-        bucketSpec? (AS? query)?                                       
#createTableUsing
+        bucketSpec? locationSpec?
+        (COMMENT comment=STRING)?
+        (AS? query)?                                                   
#createTable
     | createTableHeader ('(' columns=colTypeList ')')?
-        (COMMENT STRING)?
+        (COMMENT comment=STRING)?
         (PARTITIONED BY '(' partitionColumns=colTypeList ')')?
         bucketSpec? skewSpec?
         rowFormat?  createFileFormat? locationSpec?
         (TBLPROPERTIES tablePropertyList)?
-        (AS? query)?                                                   
#createTable
+        (AS? query)?                                                   
#createHiveTable
     | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
         LIKE source=tableIdentifier                                    
#createTableLike
     | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
index a7f7a8a..29e49a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends 
Map[String, String]
 
   override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
 
+  override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase)
+
   override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
     baseMap + kv.copy(_1 = kv._1.toLowerCase)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 14a983e..41768d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -342,11 +342,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create a data source table, returning a [[CreateTable]] logical plan.
+   * Create a table, returning a [[CreateTable]] logical plan.
    *
    * Expected format:
    * {{{
-   *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
    *   USING table_provider
    *   [OPTIONS table_property_list]
    *   [PARTITIONED BY (col_name, col_name, ...)]
@@ -354,19 +354,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
    *    [SORTED BY (col_name [ASC|DESC], ...)]
    *    INTO num_buckets BUCKETS
    *   ]
+   *   [LOCATION path]
+   *   [COMMENT table_comment]
    *   [AS select_statement];
    * }}}
    */
-  override def visitCreateTableUsing(ctx: CreateTableUsingContext): 
LogicalPlan = withOrigin(ctx) {
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = 
withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = 
visitCreateTableHeader(ctx.createTableHeader)
     if (external) {
       operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
     }
-    val options = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
-    if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
-      throw new AnalysisException("Cannot create hive serde table with CREATE 
TABLE USING")
-    }
     val schema = Option(ctx.colTypeList()).map(createSchema)
     val partitionColumnNames =
       Option(ctx.partitionColumnNames)
@@ -374,10 +373,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
         .getOrElse(Array.empty[String])
     val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
-    // TODO: this may be wrong for non file-based data source like JDBC, which 
should be external
-    // even there is no `path` in options. We should consider allow the 
EXTERNAL keyword.
+    val location = Option(ctx.locationSpec).map(visitLocationSpec)
     val storage = DataSource.buildStorageFormatFromOptions(options)
-    val tableType = if (storage.locationUri.isDefined) {
+
+    if (location.isDefined && storage.locationUri.isDefined) {
+      throw new ParseException(
+        "LOCATION and 'path' in OPTIONS are both used to indicate the custom 
table path, " +
+          "you can only specify one of them.", ctx)
+    }
+    val customLocation = storage.locationUri.orElse(location)
+
+    val tableType = if (customLocation.isDefined) {
       CatalogTableType.EXTERNAL
     } else {
       CatalogTableType.MANAGED
@@ -386,12 +392,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
     val tableDesc = CatalogTable(
       identifier = table,
       tableType = tableType,
-      storage = storage,
+      storage = storage.copy(locationUri = customLocation),
       schema = schema.getOrElse(new StructType),
       provider = Some(provider),
       partitionColumnNames = partitionColumnNames,
-      bucketSpec = bucketSpec
-    )
+      bucketSpec = bucketSpec,
+      comment = Option(ctx.comment).map(string))
 
     // Determine the storage mode.
     val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@@ -1011,10 +1017,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create a table, returning a [[CreateTable]] logical plan.
+   * Create a Hive serde table, returning a [[CreateTable]] logical plan.
    *
-   * This is not used to create datasource tables, which is handled through
-   * "CREATE TABLE ... USING ...".
+   * This is a legacy syntax for Hive compatibility, we recommend users to use 
the Spark SQL
+   * CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... 
USING hive ..."
    *
    * Note: several features are currently not supported - temporary tables, 
bucketing,
    * skewed columns and storage handlers (STORED BY).
@@ -1032,7 +1038,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
    *   [AS select_statement];
    * }}}
    */
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = 
withOrigin(ctx) {
+  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan 
= withOrigin(ctx) {
     val (name, temp, ifNotExists, external) = 
visitCreateTableHeader(ctx.createTableHeader)
     // TODO: implement temporary tables
     if (temp) {
@@ -1046,7 +1052,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
     if (ctx.bucketSpec != null) {
       operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
     }
-    val comment = Option(ctx.STRING).map(string)
     val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
     val partitionCols = 
Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
     val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
@@ -1057,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
     val schema = StructType(dataCols ++ partitionCols)
 
     // Storage format
-    val defaultStorage: CatalogStorageFormat = {
-      val defaultStorageType = conf.getConfString("hive.default.fileformat", 
"textfile")
-      val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
-      CatalogStorageFormat(
-        locationUri = None,
-        inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
-          .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
-        outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
-          
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-        serde = defaultHiveSerde.flatMap(_.serde),
-        compressed = false,
-        properties = Map())
-    }
+    val defaultStorage = HiveSerDe.getDefaultStorage(conf)
     validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
     val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
       .getOrElse(CatalogStorageFormat.empty)
@@ -1104,7 +1097,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
       provider = Some(DDLUtils.HIVE_PROVIDER),
       partitionColumnNames = partitionCols.map(_.name),
       properties = properties,
-      comment = comment)
+      comment = Option(ctx.comment).map(string))
 
     val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 28808f8..1257d17 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -408,8 +408,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTable(tableDesc, mode, None)
-        if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
+      case CreateTable(tableDesc, mode, None) if 
DDLUtils.isHiveTable(tableDesc) =>
         val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == 
SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
 
@@ -421,8 +420,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       // CREATE TABLE ... AS SELECT ... for hive serde table is handled in 
hive module, by rule
       // `CreateTables`
 
-      case CreateTable(tableDesc, mode, Some(query))
-        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
+      case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isDatasourceTable(tableDesc) =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
             tableDesc,

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 59a29e8..82cbb4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -762,8 +762,12 @@ case class AlterTableSetLocationCommand(
 object DDLUtils {
   val HIVE_PROVIDER = "hive"
 
+  def isHiveTable(table: CatalogTable): Boolean = {
+    table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER
+  }
+
   def isDatasourceTable(table: CatalogTable): Boolean = {
-    table.provider.isDefined && table.provider.get != HIVE_PROVIDER
+    table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 07b1667..94ba814 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) 
extends Rule[LogicalPl
         throw new AnalysisException("Saving data into a view is not allowed.")
       }
 
-      if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (DDLUtils.isHiveTable(existingTable)) {
         throw new AnalysisException(s"Saving data in the Hive serde table 
$tableName is " +
           "not supported yet. Please use the insertInto() API as an 
alternative.")
       }
@@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) 
extends Rule[LogicalPl
     checkDuplication(normalizedPartitionCols, "partition")
 
     if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
-      if (table.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (DDLUtils.isHiveTable(table)) {
         // When we hit this branch, it means users didn't specify schema for 
the table to be
         // created, as we always include partition columns in table schema for 
hive serde tables.
         // The real schema will be inferred at hive metastore by hive serde, 
plus the given
@@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends 
Rule[LogicalPlan] {
 object HiveOnlyCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case CreateTable(tableDesc, _, Some(_))
-          if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
+      case CreateTable(tableDesc, _, Some(_)) if 
DDLUtils.isHiveTable(tableDesc) =>
         throw new AnalysisException("Hive support is required to use CREATE 
Hive TABLE AS SELECT")
 
       case _ => // OK

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index 52e648a..ca46a11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -17,12 +17,49 @@
 
 package org.apache.spark.sql.internal
 
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+
 case class HiveSerDe(
   inputFormat: Option[String] = None,
   outputFormat: Option[String] = None,
   serde: Option[String] = None)
 
 object HiveSerDe {
+  val serdeMap = Map(
+    "sequencefile" ->
+      HiveSerDe(
+        inputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+        outputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
+
+    "rcfile" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+        outputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
+        serde = 
Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
+
+    "orc" ->
+      HiveSerDe(
+        inputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+        outputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
+        serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
+
+    "parquet" ->
+      HiveSerDe(
+        inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+        outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+        serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
+
+    "textfile" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+        outputFormat = 
Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+
+    "avro" ->
+      HiveSerDe(
+        inputFormat = 
Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
+        outputFormat = 
Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
+        serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
+
   /**
    * Get the Hive SerDe information from the data source abbreviation string 
or classname.
    *
@@ -31,41 +68,6 @@ object HiveSerDe {
    * @return HiveSerDe associated with the specified source
    */
   def sourceToSerDe(source: String): Option[HiveSerDe] = {
-    val serdeMap = Map(
-      "sequencefile" ->
-        HiveSerDe(
-          inputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
-          outputFormat = 
Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
-
-      "rcfile" ->
-        HiveSerDe(
-          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
-          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
-          serde = 
Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
-
-      "orc" ->
-        HiveSerDe(
-          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
-          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
-
-      "parquet" ->
-        HiveSerDe(
-          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
-          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
-          serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
-
-      "textfile" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
-          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-
-      "avro" ->
-        HiveSerDe(
-          inputFormat = 
Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
-          outputFormat = 
Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
-
     val key = source.toLowerCase match {
       case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
       case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
@@ -77,4 +79,16 @@ object HiveSerDe {
 
     serdeMap.get(key)
   }
+
+  def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
+    val defaultStorageType = conf.getConfString("hive.default.fileformat", 
"textfile")
+    val defaultHiveSerde = sourceToSerDe(defaultStorageType)
+    CatalogStorageFormat.empty.copy(
+      inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
+        .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
+      outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
+        
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+      serde = defaultHiveSerde.flatMap(_.serde)
+        .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index b070138..15e490f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -121,7 +121,8 @@ class SparkSqlParserSuite extends PlanTest {
       tableType: CatalogTableType = CatalogTableType.MANAGED,
       storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
         inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
-        outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat),
+        outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat,
+        serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
       schema: StructType = new StructType,
       provider: Option[String] = Some("hive"),
       partitionColumnNames: Seq[String] = Seq.empty,

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 1a5e522..76bb9e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -236,7 +236,7 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed4, expected4)
   }
 
-  test("create table - table file format") {
+  test("create hive table - table file format") {
     val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", 
"avrofile",
       "sequencefile", "rcfile", "textfile")
 
@@ -245,13 +245,14 @@ class DDLCommandSuite extends PlanTest {
       val ct = parseAs[CreateTable](query)
       val hiveSerde = HiveSerDe.sourceToSerDe(s)
       assert(hiveSerde.isDefined)
-      assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+      assert(ct.tableDesc.storage.serde ==
+        
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
       assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
       assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
     }
   }
 
-  test("create table - row format and table file format") {
+  test("create hive table - row format and table file format") {
     val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
     val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 
'outputfmt'"
     val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
@@ -262,13 +263,15 @@ class DDLCommandSuite extends PlanTest {
     assert(parsed1.tableDesc.storage.serde == Some("anything"))
     assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
     assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
+
     val parsed2 = parseAs[CreateTable](query2)
-    assert(parsed2.tableDesc.storage.serde.isEmpty)
+    assert(parsed2.tableDesc.storage.serde ==
+      Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
     assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
   }
 
-  test("create table - row format serde and generic file format") {
+  test("create hive table - row format serde and generic file format") {
     val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", 
"textfile")
     val supportedSources = Set("sequencefile", "rcfile", "textfile")
 
@@ -287,7 +290,7 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
-  test("create table - row format delimited and generic file format") {
+  test("create hive table - row format delimited and generic file format") {
     val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", 
"textfile")
     val supportedSources = Set("textfile")
 
@@ -297,7 +300,8 @@ class DDLCommandSuite extends PlanTest {
         val ct = parseAs[CreateTable](query)
         val hiveSerde = HiveSerDe.sourceToSerDe(s)
         assert(hiveSerde.isDefined)
-        assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+        assert(ct.tableDesc.storage.serde ==
+          
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
         assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
         assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
       } else {
@@ -306,7 +310,7 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
-  test("create external table - location must be specified") {
+  test("create hive external table - location must be specified") {
     assertUnsupported(
       sql = "CREATE EXTERNAL TABLE my_tab",
       containsThesePhrases = Seq("create external table", "location"))
@@ -316,7 +320,7 @@ class DDLCommandSuite extends PlanTest {
     assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
   }
 
-  test("create table - property values must be set") {
+  test("create hive table - property values must be set") {
     assertUnsupported(
       sql = "CREATE TABLE my_tab TBLPROPERTIES('key_without_value', 
'key_with_value'='x')",
       containsThesePhrases = Seq("key_without_value"))
@@ -326,14 +330,14 @@ class DDLCommandSuite extends PlanTest {
       containsThesePhrases = Seq("key_without_value"))
   }
 
-  test("create table - location implies external") {
+  test("create hive table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
     val ct = parseAs[CreateTable](query)
     assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
     assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
   }
 
-  test("create table using - with partitioned by") {
+  test("create table - with partitioned by") {
     val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
       "USING parquet PARTITIONED BY (a)"
 
@@ -357,7 +361,7 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
-  test("create table using - with bucket") {
+  test("create table - with bucket") {
     val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
       "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
 
@@ -379,6 +383,57 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
+  test("create table - with comment") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 
'abc'"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"),
+      comment = Some("abc"))
+
+    parser.parsePlan(sql) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+          s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with location") {
+    val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION 
'/tmp/file'"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.EXTERNAL,
+      storage = CatalogStorageFormat.empty.copy(locationUri = 
Some("/tmp/file")),
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"))
+
+    parser.parsePlan(v1) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = 
tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
+          s"got ${other.getClass.getName}: $v1")
+    }
+
+    val v2 =
+      """
+        |CREATE TABLE my_tab(a INT, b STRING)
+        |USING parquet
+        |OPTIONS (path '/tmp/file')
+        |LOCATION '/tmp/file'
+      """.stripMargin
+    val e = intercept[ParseException] {
+      parser.parsePlan(v2)
+    }
+    assert(e.message.contains("you can only specify one of them."))
+  }
+
   // ALTER TABLE table_name RENAME TO new_table_name;
   // ALTER VIEW view_name RENAME TO new_view_name;
   test("alter table/view: rename table/view") {

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index fde6d4a..474a2c8 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -215,7 +215,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         tableDefinition.storage.locationUri
       }
 
-      if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (DDLUtils.isHiveTable(tableDefinition)) {
         val tableWithDataSourceProps = tableDefinition.copy(
           // We can't leave `locationUri` empty and count on Hive metastore to 
set a default table
           // location, because Hive metastore uses 
hive.metastore.warehouse.dir to generate default
@@ -533,7 +533,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     } else {
       val oldTableDef = getRawTable(db, withStatsProps.identifier.table)
 
-      val newStorage = if (tableDefinition.provider.get == 
DDLUtils.HIVE_PROVIDER) {
+      val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) {
         tableDefinition.storage
       } else {
         // We can't alter the table storage of data source table directly for 
2 reasons:

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bea073f..52892f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -64,6 +64,7 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
+        new DetermineHiveSerde(conf) ::
         (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil 
else Nil)
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 773c4a3..6d5cc57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,14 +18,73 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+
+
+/**
+ * Determine the serde/format of the Hive serde table, according to the 
storage properties.
+ */
+class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && 
t.storage.serde.isEmpty =>
+      if (t.bucketSpec.isDefined) {
+        throw new AnalysisException("Creating bucketed Hive serde table is not 
supported yet.")
+      }
+      if (t.partitionColumnNames.nonEmpty && query.isDefined) {
+        val errorMessage = "A Create Table As Select (CTAS) statement is not 
allowed to " +
+          "create a partitioned table using Hive's file formats. " +
+          "Please use the syntax of \"CREATE TABLE tableName USING dataSource 
" +
+          "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table 
through a " +
+          "CTAS statement."
+        throw new AnalysisException(errorMessage)
+      }
+
+      val defaultStorage = HiveSerDe.getDefaultStorage(conf)
+      val options = new HiveOptions(t.storage.properties)
+
+      val fileStorage = if (options.fileFormat.isDefined) {
+        HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
+          case Some(s) =>
+            CatalogStorageFormat.empty.copy(
+              inputFormat = s.inputFormat,
+              outputFormat = s.outputFormat,
+              serde = s.serde)
+          case None =>
+            throw new IllegalArgumentException(s"invalid fileFormat: 
'${options.fileFormat.get}'")
+        }
+      } else if (options.hasInputOutputFormat) {
+        CatalogStorageFormat.empty.copy(
+          inputFormat = options.inputFormat,
+          outputFormat = options.outputFormat)
+      } else {
+        CatalogStorageFormat.empty
+      }
+
+      val rowStorage = if (options.serde.isDefined) {
+        CatalogStorageFormat.empty.copy(serde = options.serde)
+      } else {
+        CatalogStorageFormat.empty
+      }
+
+      val storage = t.storage.copy(
+        inputFormat = 
fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+        outputFormat = 
fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+        serde = 
rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+        properties = options.serdeProperties)
+
+      c.copy(tableDesc = t.copy(storage = storage))
+  }
+}
 
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
@@ -49,15 +108,7 @@ private[hive] trait HiveStrategies {
         InsertIntoHiveTable(
           table, partition, planLater(child), overwrite, ifNotExists) :: Nil
 
-      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get 
== "hive" =>
-        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
-          // add default serde
-          tableDesc.withNewStorage(
-            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-        } else {
-          tableDesc
-        }
-
+      case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
         // Currently we will never hit this branch, as SQL string API can only 
use `Ignore` or
         // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't 
support hive serde
         // tables yet.
@@ -68,7 +119,7 @@ private[hive] trait HiveStrategies {
 
         val dbName = 
tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
         val cmd = CreateHiveTableAsSelectCommand(
-          newTableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
+          tableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
           query,
           mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
new file mode 100644
index 0000000..35b7a68
--- /dev/null
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Options for the Hive data source. Note that rule `DetermineHiveSerde` will 
extract Hive
+ * serde/format information from these options.
+ */
+class HiveOptions(@transient private val parameters: CaseInsensitiveMap) 
extends Serializable {
+  import HiveOptions._
+
+  def this(parameters: Map[String, String]) = this(new 
CaseInsensitiveMap(parameters))
+
+  val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase)
+  val inputFormat = parameters.get(INPUT_FORMAT)
+  val outputFormat = parameters.get(OUTPUT_FORMAT)
+
+  if (inputFormat.isDefined != outputFormat.isDefined) {
+    throw new IllegalArgumentException("Cannot specify only inputFormat or 
outputFormat, you " +
+      "have to specify both of them.")
+  }
+
+  def hasInputOutputFormat: Boolean = inputFormat.isDefined
+
+  if (fileFormat.isDefined && inputFormat.isDefined) {
+    throw new IllegalArgumentException("Cannot specify fileFormat and 
inputFormat/outputFormat " +
+      "together for Hive data source.")
+  }
+
+  val serde = parameters.get(SERDE)
+
+  if (fileFormat.isDefined && serde.isDefined) {
+    if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) {
+      throw new IllegalArgumentException(
+        s"fileFormat '${fileFormat.get}' already specifies a serde.")
+    }
+  }
+
+  val containsDelimiters = delimiterOptions.keys.exists(parameters.contains)
+
+  if (containsDelimiters) {
+    if (serde.isDefined) {
+      throw new IllegalArgumentException("Cannot specify delimiters with a 
custom serde.")
+    }
+    if (fileFormat.isEmpty) {
+      throw new IllegalArgumentException("Cannot specify delimiters without 
fileFormat.")
+    }
+    if (fileFormat.get != "textfile") {
+      throw new IllegalArgumentException("Cannot specify delimiters as they 
are only compatible " +
+        s"with fileFormat 'textfile', not ${fileFormat.get}.")
+    }
+  }
+
+  for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") {
+    throw new IllegalArgumentException("Hive data source only support newline 
'\\n' as " +
+      s"line delimiter, but given: $lineDelim.")
+  }
+
+  def serdeProperties: Map[String, String] = parameters.filterKeys {
+    k => !lowerCasedOptionNames.contains(k.toLowerCase)
+  }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }
+}
+
+object HiveOptions {
+  private val lowerCasedOptionNames = collection.mutable.Set[String]()
+
+  private def newOption(name: String): String = {
+    lowerCasedOptionNames += name.toLowerCase
+    name
+  }
+
+  val FILE_FORMAT = newOption("fileFormat")
+  val INPUT_FORMAT = newOption("inputFormat")
+  val OUTPUT_FORMAT = newOption("outputFormat")
+  val SERDE = newOption("serde")
+
+  // A map from the public delimiter option keys to the underlying Hive serde 
property keys.
+  val delimiterOptions = Map(
+    "fieldDelim" -> "field.delim",
+    "escapeDelim" -> "escape.delim",
+    // The following typo is inherited from Hive...
+    "collectionDelim" -> "colelction.delim",
+    "mapkeyDelim" -> "mapkey.delim",
+    "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase -> v }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 3f1f86c..5a3fcd7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.types.StructType
 
-private[orc] object OrcFileOperator extends Logging {
+private[hive] object OrcFileOperator extends Logging {
   /**
    * Retrieves an ORC file reader from a given path.  The path can point to 
either a directory or a
    * single ORC file.  If it points to a directory, it picks any non-empty ORC 
file within that

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index d13e29b..b67e5f6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Generate, 
ScriptTransformati
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.StructType
 
@@ -51,6 +50,12 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils 
with TestHiveSingle
     assert(e.getMessage.toLowerCase.contains("operation not allowed"))
   }
 
+  private def analyzeCreateTable(sql: String): CatalogTable = {
+    TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect {
+      case CreateTable(tableDesc, mode, _) => tableDesc
+    }.head
+  }
+
   test("Test CTAS #1") {
     val s1 =
       """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
@@ -76,7 +81,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils 
with TestHiveSingle
     assert(desc.storage.outputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==
       Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
-    assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+    assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
   }
 
   test("Test CTAS #2") {
@@ -107,7 +112,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     assert(desc.storage.inputFormat == 
Some("parquet.hive.DeprecatedParquetInputFormat"))
     assert(desc.storage.outputFormat == 
Some("parquet.hive.DeprecatedParquetOutputFormat"))
     assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe"))
-    assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+    assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
   }
 
   test("Test CTAS #3") {
@@ -125,7 +130,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     assert(desc.storage.inputFormat == 
Some("org.apache.hadoop.mapred.TextInputFormat"))
     assert(desc.storage.outputFormat ==
       Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
-    assert(desc.storage.serde.isEmpty)
+    assert(desc.storage.serde == 
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(desc.properties == Map())
   }
 
@@ -305,7 +310,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
       Some("org.apache.hadoop.mapred.TextInputFormat"))
     assert(desc.storage.outputFormat ==
       Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
-    assert(desc.storage.serde.isEmpty)
+    assert(desc.storage.serde == 
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(desc.storage.properties.isEmpty)
     assert(desc.properties.isEmpty)
     assert(desc.comment.isEmpty)
@@ -412,7 +417,7 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     val (desc2, _) = extractTableDesc(query2)
     assert(desc1.storage.inputFormat == Some("winput"))
     assert(desc1.storage.outputFormat == Some("wowput"))
-    assert(desc1.storage.serde.isEmpty)
+    assert(desc1.storage.serde == 
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(desc2.storage.inputFormat == 
Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
     assert(desc2.storage.outputFormat == 
Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
     assert(desc2.storage.serde == 
Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
@@ -592,4 +597,94 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
     val hiveClient = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
     assert(hiveClient.getConf("hive.in.test", "") == "true")
   }
+
+  test("create hive serde table with new syntax - basic") {
+    val sql =
+      """
+        |CREATE TABLE t
+        |(id int, name string COMMENT 'blabla')
+        |USING hive
+        |OPTIONS (fileFormat 'parquet', my_prop 1)
+        |LOCATION '/tmp/file'
+        |COMMENT 'BLABLA'
+      """.stripMargin
+
+    val table = analyzeCreateTable(sql)
+    assert(table.schema == new StructType()
+      .add("id", "int")
+      .add("name", "string", nullable = true, comment = "blabla"))
+    assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
+    assert(table.storage.locationUri == Some("/tmp/file"))
+    assert(table.storage.properties == Map("my_prop" -> "1"))
+    assert(table.comment == Some("BLABLA"))
+
+    assert(table.storage.inputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+    assert(table.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+    assert(table.storage.serde ==
+      Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+  }
+
+  test("create hive serde table with new syntax - with partition and 
bucketing") {
+    val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)"
+    val table = analyzeCreateTable(v1)
+    assert(table.schema == new StructType().add("c1", "int").add("c2", "int"))
+    assert(table.partitionColumnNames == Seq("c2"))
+    // check the default formats
+    assert(table.storage.serde == 
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+    assert(table.storage.inputFormat == 
Some("org.apache.hadoop.mapred.TextInputFormat"))
+    assert(table.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
+
+    val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) 
INTO 4 BUCKETS"
+    val e2 = intercept[AnalysisException](analyzeCreateTable(v2))
+    assert(e2.message.contains("Creating bucketed Hive serde table is not 
supported yet"))
+
+    val v3 =
+      """
+        |CREATE TABLE t (c1 int, c2 int) USING hive
+        |PARTITIONED BY (c2)
+        |CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin
+    val e3 = intercept[AnalysisException](analyzeCreateTable(v3))
+    assert(e3.message.contains("Creating bucketed Hive serde table is not 
supported yet"))
+  }
+
+  test("create hive serde table with new syntax - Hive options error 
checking") {
+    val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')"
+    val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1))
+    assert(e1.getMessage.contains("Cannot specify only inputFormat or 
outputFormat"))
+
+    val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " +
+      "(fileFormat 'x', inputFormat 'a', outputFormat 'b')"
+    val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2))
+    assert(e2.getMessage.contains(
+      "Cannot specify fileFormat and inputFormat/outputFormat together"))
+
+    val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 
'parquet', serde 'a')"
+    val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3))
+    assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a 
serde"))
+
+    val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', 
fieldDelim ' ')"
+    val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4))
+    assert(e4.getMessage.contains("Cannot specify delimiters with a custom 
serde"))
+
+    val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')"
+    val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5))
+    assert(e5.getMessage.contains("Cannot specify delimiters without 
fileFormat"))
+
+    val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 
'parquet', fieldDelim ' ')"
+    val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6))
+    assert(e6.getMessage.contains(
+      "Cannot specify delimiters as they are only compatible with fileFormat 
'textfile'"))
+
+    // The value of 'fileFormat' option is case-insensitive.
+    val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 
'TEXTFILE', lineDelim ',')"
+    val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7))
+    assert(e7.getMessage.contains("Hive data source only support newline '\\n' 
as line delimiter"))
+
+    val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')"
+    val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8))
+    assert(e8.getMessage.contains("invalid fileFormat: 'wrong'"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 6fee458..2f02bb5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -68,6 +68,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
 
     val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
     
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
-    assert(externalCatalog.getTable("db1", "hive_tbl").provider == 
Some(DDLUtils.HIVE_PROVIDER))
+    assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 2b8d4e2..aed825e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1189,21 +1189,6 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     }
   }
 
-  test("create a data source table using hive") {
-    val tableName = "tab1"
-    withTable (tableName) {
-      val e = intercept[AnalysisException] {
-        sql(
-          s"""
-             |CREATE TABLE $tableName
-             |(col1 int)
-             |USING hive
-           """.stripMargin)
-      }.getMessage
-      assert(e.contains("Cannot create hive serde table with CREATE TABLE 
USING"))
-    }
-  }
-
   test("create a temp view using hive") {
     val tableName = "tab1"
     withTable (tableName) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 8b34219..3ac07d0 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, Cat
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -1250,4 +1251,42 @@ class HiveDDLSuite
       assert(e.message.contains("unknown is not a valid partition column"))
     }
   }
+
+  test("create hive serde table with new syntax") {
+    withTable("t", "t2", "t3") {
+      withTempPath { path =>
+        sql(
+          s"""
+            |CREATE TABLE t(id int) USING hive
+            |OPTIONS(fileFormat 'orc', compression 'Zlib')
+            |LOCATION '${path.getCanonicalPath}'
+          """.stripMargin)
+        val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(DDLUtils.isHiveTable(table))
+        assert(table.storage.serde == 
Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+        assert(table.storage.properties.get("compression") == Some("Zlib"))
+        assert(spark.table("t").collect().isEmpty)
+
+        sql("INSERT INTO t SELECT 1")
+        checkAnswer(spark.table("t"), Row(1))
+        // Check if this is compressed as ZLIB.
+        val maybeOrcFile = 
path.listFiles().find(_.getName.endsWith("part-00000"))
+        assert(maybeOrcFile.isDefined)
+        val orcFilePath = maybeOrcFile.get.toPath.toString
+        val expectedCompressionKind =
+          OrcFileOperator.getFileReader(orcFilePath).get.getCompression
+        assert("ZLIB" === expectedCompressionKind.name())
+
+        sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
+        val table2 = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
+        assert(DDLUtils.isHiveTable(table2))
+        assert(table2.storage.serde == 
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+        checkAnswer(spark.table("t2"), Row(1, "a"))
+
+        sql("CREATE TABLE t3(a int, p int) USING hive PARTITIONED BY (p)")
+        sql("INSERT INTO t3 PARTITION(p=1) SELECT 0")
+        checkAnswer(spark.table("t3"), Row(0, 1))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cca945b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 463c368..e678cf6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -93,8 +93,6 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         .orc(path)
 
       // Check if this is compressed as ZLIB.
-      val conf = spark.sessionState.newHadoopConf()
-      val fs = FileSystem.getLocal(conf)
       val maybeOrcFile = new 
File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
       assert(maybeOrcFile.isDefined)
       val orcFilePath = maybeOrcFile.get.toPath.toString


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to